From eb0e2cb31bba88799e9cbe3268335acb239a889c Mon Sep 17 00:00:00 2001 From: florijan Date: Mon, 19 Mar 2018 14:42:32 +0100 Subject: [PATCH] Extract cpp from hpp in distributed, fix includes Summary: Also removed some convenience code that became obsolete. No logic changes. Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1303 --- src/CMakeLists.txt | 9 + src/database/graph_db_accessor.cpp | 11 +- src/database/indexes/key_index.hpp | 1 - src/database/indexes/label_property_index.hpp | 9 +- src/distributed/index_rpc_server.cpp | 33 ++ src/distributed/index_rpc_server.hpp | 34 +- src/distributed/remote_cache.cpp | 101 +++++ src/distributed/remote_cache.hpp | 106 +---- src/distributed/remote_data_manager.cpp | 59 +++ src/distributed/remote_data_manager.hpp | 78 +--- src/distributed/remote_data_rpc_clients.cpp | 26 ++ src/distributed/remote_data_rpc_clients.hpp | 43 +-- src/distributed/remote_data_rpc_server.cpp | 28 ++ src/distributed/remote_data_rpc_server.hpp | 26 +- src/distributed/remote_produce_rpc_server.cpp | 170 ++++++++ src/distributed/remote_produce_rpc_server.hpp | 169 +------- src/distributed/remote_pull_rpc_clients.cpp | 72 ++++ src/distributed/remote_pull_rpc_clients.hpp | 69 +--- .../remote_updates_rpc_clients.cpp | 125 ++++++ .../remote_updates_rpc_clients.hpp | 99 +---- src/distributed/remote_updates_rpc_server.cpp | 362 ++++++++++++++++++ src/distributed/remote_updates_rpc_server.hpp | 344 +---------------- src/storage/record_accessor.cpp | 1 + 23 files changed, 1092 insertions(+), 883 deletions(-) create mode 100644 src/distributed/index_rpc_server.cpp create mode 100644 src/distributed/remote_cache.cpp create mode 100644 src/distributed/remote_data_manager.cpp create mode 100644 src/distributed/remote_data_rpc_clients.cpp create mode 100644 src/distributed/remote_data_rpc_server.cpp create mode 100644 src/distributed/remote_produce_rpc_server.cpp create mode 100644 src/distributed/remote_pull_rpc_clients.cpp create mode 100644 src/distributed/remote_updates_rpc_clients.cpp create mode 100644 src/distributed/remote_updates_rpc_server.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 516a21a21..c9c0eba09 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -16,8 +16,17 @@ set(memgraph_src_files database/state_delta.cpp distributed/coordination_master.cpp distributed/coordination_worker.cpp + distributed/index_rpc_server.cpp distributed/plan_consumer.cpp distributed/plan_dispatcher.cpp + distributed/remote_cache.cpp + distributed/remote_data_manager.cpp + distributed/remote_data_rpc_clients.cpp + distributed/remote_data_rpc_server.cpp + distributed/remote_produce_rpc_server.cpp + distributed/remote_pull_rpc_clients.cpp + distributed/remote_updates_rpc_clients.cpp + distributed/remote_updates_rpc_server.cpp durability/paths.cpp durability/recovery.cpp durability/snapshooter.cpp diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index be5473f4d..1ee4b10b0 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -93,7 +93,7 @@ VertexAccessor GraphDbAccessor::InsertVertexIntoRemote( for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second); db().remote_data_manager() - .Vertices(transaction_id()) + .Elements(transaction_id()) .emplace(gid, nullptr, std::move(vertex)); return VertexAccessor({gid, worker_id}, *this); } @@ -417,12 +417,12 @@ EdgeAccessor GraphDbAccessor::InsertEdge( transaction_id(), from, to, edge_type); from_updated = db().remote_data_manager() - .Vertices(transaction_id()) + .Elements(transaction_id()) .FindNew(from.gid()); // Create an Edge and insert it into the RemoteCache so we see it locally. db().remote_data_manager() - .Edges(transaction_id()) + .Elements(transaction_id()) .emplace( edge_address.gid(), nullptr, std::make_unique(from.address(), to.address(), edge_type)); @@ -446,8 +446,9 @@ EdgeAccessor GraphDbAccessor::InsertEdge( transaction_id(), from, db().storage().GlobalizedAddress(edge_address), to, edge_type); } - to_updated = - db().remote_data_manager().Vertices(transaction_id()).FindNew(to.gid()); + to_updated = db().remote_data_manager() + .Elements(transaction_id()) + .FindNew(to.gid()); } to_updated->in_.emplace( db_.storage().LocalizedAddressIfPossible(from.address()), edge_address, diff --git a/src/database/indexes/key_index.hpp b/src/database/indexes/key_index.hpp index 2f6198810..0819db5ee 100644 --- a/src/database/indexes/key_index.hpp +++ b/src/database/indexes/key_index.hpp @@ -3,7 +3,6 @@ #include "glog/logging.h" #include "data_structures/concurrent/concurrent_map.hpp" -#include "database/graph_db.hpp" #include "database/indexes/index_common.hpp" #include "mvcc/version_list.hpp" #include "storage/edge.hpp" diff --git a/src/database/indexes/label_property_index.hpp b/src/database/indexes/label_property_index.hpp index b09d07a1d..93a19ebf9 100644 --- a/src/database/indexes/label_property_index.hpp +++ b/src/database/indexes/label_property_index.hpp @@ -3,7 +3,7 @@ #include #include "data_structures/concurrent/concurrent_map.hpp" -#include "database/graph_db.hpp" +#include "data_structures/concurrent/concurrent_set.hpp" #include "database/indexes/index_common.hpp" #include "mvcc/version_list.hpp" #include "storage/edge.hpp" @@ -259,9 +259,10 @@ class LabelPropertyIndex { auto access = GetKeyStorage(key)->access(); // create the iterator startpoint based on the lower bound - auto start_iter = lower ? access.find_or_larger(make_index_bound( - lower, lower.value().IsInclusive())) - : access.begin(); + auto start_iter = lower + ? access.find_or_larger(make_index_bound( + lower, lower.value().IsInclusive())) + : access.begin(); // a function that defines if an entry staisfies the filtering predicate. // since we already handled the lower bound, we only need to deal with the diff --git a/src/distributed/index_rpc_server.cpp b/src/distributed/index_rpc_server.cpp new file mode 100644 index 000000000..6964ebcc6 --- /dev/null +++ b/src/distributed/index_rpc_server.cpp @@ -0,0 +1,33 @@ +#include "database/graph_db.hpp" +#include "database/graph_db_accessor.hpp" +#include "distributed/index_rpc_server.hpp" + +namespace distributed { + +IndexRpcServer::IndexRpcServer(database::GraphDb &db, + communication::rpc::Server &server) + : db_(db), rpc_server_(server) { + rpc_server_.Register([this](const BuildIndexReq &req) { + + database::LabelPropertyIndex::Key key{req.member.label, + req.member.property}; + database::GraphDbAccessor dba(db_, req.member.tx_id); + + if (db_.storage().label_property_index_.CreateIndex(key) == false) { + // If we are a distributed worker we just have to wait till the index + // (which should be in progress of being created) is created so that our + // return guarantess that the index has been built - this assumes that + // no worker thread that is creating an index will fail + while (!dba.LabelPropertyIndexExists(key.label_, key.property_)) { + // TODO reconsider this constant, currently rule-of-thumb chosen + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } else { + dba.PopulateIndex(key); + dba.EnableIndex(key); + } + return std::make_unique(); + }); +} + +} // namespace distributed diff --git a/src/distributed/index_rpc_server.hpp b/src/distributed/index_rpc_server.hpp index 4f6095369..3aec58b2f 100644 --- a/src/distributed/index_rpc_server.hpp +++ b/src/distributed/index_rpc_server.hpp @@ -1,38 +1,18 @@ #pragma once -#include "database/graph_db.hpp" -#include "database/graph_db_accessor.hpp" -#include "distributed/index_rpc_messages.hpp" +namespace communication::rpc { +class Server; +} -using namespace database; +namespace database { +class GraphDb; +} namespace distributed { class IndexRpcServer { public: - IndexRpcServer(database::GraphDb &db, communication::rpc::Server &server) - : db_(db), rpc_server_(server) { - rpc_server_.Register([this](const BuildIndexReq &req) { - - LabelPropertyIndex::Key key{req.member.label, req.member.property}; - GraphDbAccessor dba(db_, req.member.tx_id); - - if (db_.storage().label_property_index_.CreateIndex(key) == false) { - // If we are a distributed worker we just have to wait till the index - // (which should be in progress of being created) is created so that our - // return guarantess that the index has been built - this assumes that - // no worker thread that is creating an index will fail - while (!dba.LabelPropertyIndexExists(key.label_, key.property_)) { - // TODO reconsider this constant, currently rule-of-thumb chosen - std::this_thread::sleep_for(std::chrono::microseconds(100)); - } - } else { - dba.PopulateIndex(key); - dba.EnableIndex(key); - } - return std::make_unique(); - }); - } + IndexRpcServer(database::GraphDb &db, communication::rpc::Server &server); private: database::GraphDb &db_; diff --git a/src/distributed/remote_cache.cpp b/src/distributed/remote_cache.cpp new file mode 100644 index 000000000..37eb80404 --- /dev/null +++ b/src/distributed/remote_cache.cpp @@ -0,0 +1,101 @@ + +#include "glog/logging.h" + +#include "database/storage.hpp" +#include "distributed/remote_cache.hpp" +#include "storage/edge.hpp" +#include "storage/vertex.hpp" + +namespace distributed { + +template +TRecord *RemoteCache::FindNew(gid::Gid gid) { + std::lock_guard guard{lock_}; + auto found = cache_.find(gid); + DCHECK(found != cache_.end()) + << "FindNew for uninitialized remote Vertex/Edge"; + auto &pair = found->second; + if (!pair.second) { + pair.second = std::unique_ptr(pair.first->CloneData()); + } + return pair.second.get(); +} + +template +void RemoteCache::FindSetOldNew(tx::transaction_id_t tx_id, + int worker_id, gid::Gid gid, + TRecord *&old_record, + TRecord *&new_record) { + { + std::lock_guard guard(lock_); + auto found = cache_.find(gid); + if (found != cache_.end()) { + old_record = found->second.first.get(); + new_record = found->second.second.get(); + return; + } + } + + auto remote = + remote_data_clients_.RemoteElement(worker_id, tx_id, gid); + LocalizeAddresses(*remote); + + // This logic is a bit strange because we need to make sure that someone + // else didn't get a response and updated the cache before we did and we + // need a lock for that, but we also need to check if we can now return + // that result - otherwise we could get incosistent results for remote + // FindSetOldNew + std::lock_guard guard(lock_); + auto it_pair = cache_.emplace( + gid, std::make_pair(std::move(remote), nullptr)); + + old_record = it_pair.first->second.first.get(); + new_record = it_pair.first->second.second.get(); +} + +template +void RemoteCache::emplace(gid::Gid gid, rec_uptr old_record, + rec_uptr new_record) { + if (old_record) LocalizeAddresses(*old_record); + if (new_record) LocalizeAddresses(*new_record); + + std::lock_guard guard{lock_}; + // We can't replace existing data because some accessors might be using + // it. + // TODO - consider if it's necessary and OK to copy just the data content. + auto found = cache_.find(gid); + if (found != cache_.end()) + return; + else + cache_[gid] = std::make_pair(std::move(old_record), std::move(new_record)); +} + +template +void RemoteCache::ClearCache() { + std::lock_guard guard{lock_}; + cache_.clear(); +} + +template <> +void RemoteCache::LocalizeAddresses(Vertex &vertex) { + auto localize_edges = [this](auto &edges) { + for (auto &element : edges) { + element.vertex = storage_.LocalizedAddressIfPossible(element.vertex); + element.edge = storage_.LocalizedAddressIfPossible(element.edge); + } + }; + + localize_edges(vertex.in_.storage()); + localize_edges(vertex.out_.storage()); +} + +template <> +void RemoteCache::LocalizeAddresses(Edge &edge) { + edge.from_ = storage_.LocalizedAddressIfPossible(edge.from_); + edge.to_ = storage_.LocalizedAddressIfPossible(edge.to_); +} + +template class RemoteCache; +template class RemoteCache; + +} // namespace distributed diff --git a/src/distributed/remote_cache.hpp b/src/distributed/remote_cache.hpp index 5082306c6..1278838a7 100644 --- a/src/distributed/remote_cache.hpp +++ b/src/distributed/remote_cache.hpp @@ -3,14 +3,12 @@ #include #include -#include "glog/logging.h" - -#include "database/storage.hpp" #include "distributed/remote_data_rpc_clients.hpp" -#include "storage/edge.hpp" #include "storage/gid.hpp" -#include "storage/vertex.hpp" -#include "transactions/transaction.hpp" + +namespace database { +class Storage; +} namespace distributed { @@ -34,78 +32,20 @@ class RemoteCache { /// Returns the new data for the given ID. Creates it (as copy of old) if /// necessary. - TRecord *FindNew(gid::Gid gid) { - std::lock_guard guard{lock_}; - auto found = cache_.find(gid); - DCHECK(found != cache_.end()) - << "FindNew for uninitialized remote Vertex/Edge"; - auto &pair = found->second; - if (!pair.second) { - pair.second = std::unique_ptr(pair.first->CloneData()); - } - return pair.second.get(); - } + TRecord *FindNew(gid::Gid gid); - /** - * For the Vertex/Edge with the given global ID, looks for the data visible - * from the given transaction's ID and command ID, and caches it. Sets the - * given pointers to point to the fetched data. Analogue to - * mvcc::VersionList::find_set_old_new. - */ + /// For the Vertex/Edge with the given global ID, looks for the data visible + /// from the given transaction's ID and command ID, and caches it. Sets the + /// given pointers to point to the fetched data. Analogue to + /// mvcc::VersionList::find_set_old_new. void FindSetOldNew(tx::transaction_id_t tx_id, int worker_id, gid::Gid gid, - TRecord *&old_record, TRecord *&new_record) { - { - std::lock_guard guard(lock_); - auto found = cache_.find(gid); - if (found != cache_.end()) { - old_record = found->second.first.get(); - new_record = found->second.second.get(); - return; - } - } + TRecord *&old_record, TRecord *&new_record); - auto remote = - remote_data_clients_.RemoteElement(worker_id, tx_id, gid); - LocalizeAddresses(*remote); + /// Sets the given records as (new, old) data for the given gid. + void emplace(gid::Gid gid, rec_uptr old_record, rec_uptr new_record); - // This logic is a bit strange because we need to make sure that someone - // else didn't get a response and updated the cache before we did and we - // need a lock for that, but we also need to check if we can now return - // that result - otherwise we could get incosistent results for remote - // FindSetOldNew - std::lock_guard guard(lock_); - auto it_pair = cache_.emplace( - gid, std::make_pair(std::move(remote), nullptr)); - - old_record = it_pair.first->second.first.get(); - new_record = it_pair.first->second.second.get(); - } - - /** Sets the given records as (new, old) data for the given gid. */ - void emplace(gid::Gid gid, rec_uptr old_record, rec_uptr new_record) { - if (old_record) LocalizeAddresses(*old_record); - if (new_record) LocalizeAddresses(*new_record); - - std::lock_guard guard{lock_}; - // We can't replace existing data because some accessors might be using - // it. - // TODO - consider if it's necessary and OK to copy just the data content. - auto found = cache_.find(gid); - if (found != cache_.end()) - return; - else - cache_[gid] = - std::make_pair(std::move(old_record), std::move(new_record)); - } - - /// Removes all the cached data. All the pointers to that data still held by - /// RecordAccessors will become invalid and must never be dereferenced after - /// this call. To make a RecordAccessor valid again Reconstruct must be - /// called on it. This is typically done after the command advanced. - void ClearCache() { - std::lock_guard guard{lock_}; - cache_.clear(); - } + /// Removes all the data from the cache. + void ClearCache(); private: database::Storage &storage_; @@ -120,22 +60,4 @@ class RemoteCache { void LocalizeAddresses(TRecord &record); }; -template <> -inline void RemoteCache::LocalizeAddresses(Vertex &vertex) { - auto localize_edges = [this](auto &edges) { - for (auto &element : edges) { - element.vertex = storage_.LocalizedAddressIfPossible(element.vertex); - element.edge = storage_.LocalizedAddressIfPossible(element.edge); - } - }; - - localize_edges(vertex.in_.storage()); - localize_edges(vertex.out_.storage()); -} - -template <> -inline void RemoteCache::LocalizeAddresses(Edge &edge) { - edge.from_ = storage_.LocalizedAddressIfPossible(edge.from_); - edge.to_ = storage_.LocalizedAddressIfPossible(edge.to_); -} } // namespace distributed diff --git a/src/distributed/remote_data_manager.cpp b/src/distributed/remote_data_manager.cpp new file mode 100644 index 000000000..be30a412a --- /dev/null +++ b/src/distributed/remote_data_manager.cpp @@ -0,0 +1,59 @@ +#include "distributed/remote_data_manager.hpp" +#include "database/storage.hpp" + +namespace distributed { + +template +RemoteCache &RemoteDataManager::GetCache(CacheT &collection, + tx::transaction_id_t tx_id) { + auto access = collection.access(); + auto found = access.find(tx_id); + if (found != access.end()) return found->second; + + return access + .emplace( + tx_id, std::make_tuple(tx_id), + std::make_tuple(std::ref(storage_), std::ref(remote_data_clients_))) + .first->second; +} + +template <> +RemoteCache &RemoteDataManager::Elements( + tx::transaction_id_t tx_id) { + return GetCache(vertices_caches_, tx_id); +} + +template <> +RemoteCache &RemoteDataManager::Elements( + tx::transaction_id_t tx_id) { + return GetCache(edges_caches_, tx_id); +} + +RemoteDataManager::RemoteDataManager( + database::Storage &storage, + distributed::RemoteDataRpcClients &remote_data_clients) + : storage_(storage), remote_data_clients_(remote_data_clients) {} + +void RemoteDataManager::ClearCacheForSingleTransaction( + tx::transaction_id_t tx_id) { + Elements(tx_id).ClearCache(); + Elements(tx_id).ClearCache(); +} + +void RemoteDataManager::ClearTransactionalCache( + tx::transaction_id_t oldest_active) { + auto vertex_access = vertices_caches_.access(); + for (auto &kv : vertex_access) { + if (kv.first < oldest_active) { + vertex_access.remove(kv.first); + } + } + auto edge_access = edges_caches_.access(); + for (auto &kv : edge_access) { + if (kv.first < oldest_active) { + edge_access.remove(kv.first); + } + } +} + +} // namespace distributed diff --git a/src/distributed/remote_data_manager.hpp b/src/distributed/remote_data_manager.hpp index 85ddd1d42..6eb1b1518 100644 --- a/src/distributed/remote_data_manager.hpp +++ b/src/distributed/remote_data_manager.hpp @@ -1,87 +1,49 @@ #pragma once #include "data_structures/concurrent/concurrent_map.hpp" -#include "database/storage.hpp" #include "distributed/remote_cache.hpp" #include "distributed/remote_data_rpc_clients.hpp" -#include "storage/edge.hpp" -#include "storage/vertex.hpp" #include "transactions/type.hpp" +class Vertex; +class Edge; + +namespace database { +class Storage; +} + namespace distributed { -/** Handles remote data caches for edges and vertices, per transaction. */ +/// Handles remote data caches for edges and vertices, per transaction. class RemoteDataManager { - // Helper, gets or inserts a data cache for the given transaction. - template - auto &GetCache(TCollection &collection, tx::transaction_id_t tx_id) { - auto access = collection.access(); - auto found = access.find(tx_id); - if (found != access.end()) return found->second; + template + using CacheT = ConcurrentMap>; - return access - .emplace( - tx_id, std::make_tuple(tx_id), - std::make_tuple(std::ref(storage_), std::ref(remote_data_clients_))) - .first->second; - } + // Helper, gets or inserts a data cache for the given transaction. + template + RemoteCache &GetCache(CacheT &collection, + tx::transaction_id_t tx_id); public: RemoteDataManager(database::Storage &storage, - distributed::RemoteDataRpcClients &remote_data_clients) - : storage_(storage), remote_data_clients_(remote_data_clients) {} - - /// Gets or creates the remote vertex cache for the given transaction. - auto &Vertices(tx::transaction_id_t tx_id) { - return GetCache(vertices_caches_, tx_id); - } - - /// Gets or creates the remote edge cache for the given transaction. - auto &Edges(tx::transaction_id_t tx_id) { - return GetCache(edges_caches_, tx_id); - } + distributed::RemoteDataRpcClients &remote_data_clients); /// Gets or creates the remote vertex/edge cache for the given transaction. template - auto &Elements(tx::transaction_id_t tx_id); + RemoteCache &Elements(tx::transaction_id_t tx_id); /// Removes all the caches for a single transaction. - void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id) { - Vertices(tx_id).ClearCache(); - Edges(tx_id).ClearCache(); - } + void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id); /// Clears the cache of local transactions that have expired. The signature of /// this method is dictated by `distributed::CacheCleaner`. - void ClearTransactionalCache(tx::transaction_id_t oldest_active) { - auto vertex_access = vertices_caches_.access(); - for (auto &kv : vertex_access) { - if (kv.first < oldest_active) { - vertex_access.remove(kv.first); - } - } - auto edge_access = edges_caches_.access(); - for (auto &kv : edge_access) { - if (kv.first < oldest_active) { - edge_access.remove(kv.first); - } - } - } + void ClearTransactionalCache(tx::transaction_id_t oldest_active); private: database::Storage &storage_; RemoteDataRpcClients &remote_data_clients_; - ConcurrentMap> vertices_caches_; - ConcurrentMap> edges_caches_; + CacheT vertices_caches_; + CacheT edges_caches_; }; -template <> -inline auto &RemoteDataManager::Elements(tx::transaction_id_t tx_id) { - return Vertices(tx_id); -} - -template <> -inline auto &RemoteDataManager::Elements(tx::transaction_id_t tx_id) { - return Edges(tx_id); -} } // namespace distributed diff --git a/src/distributed/remote_data_rpc_clients.cpp b/src/distributed/remote_data_rpc_clients.cpp new file mode 100644 index 000000000..6f6b90933 --- /dev/null +++ b/src/distributed/remote_data_rpc_clients.cpp @@ -0,0 +1,26 @@ +#include "distributed/remote_data_rpc_clients.hpp" +#include "distributed/remote_data_rpc_messages.hpp" +#include "storage/edge.hpp" +#include "storage/vertex.hpp" + +namespace distributed { + +template <> +std::unique_ptr RemoteDataRpcClients::RemoteElement( + int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { + auto response = clients_.GetClientPool(worker_id).Call( + TxGidPair{tx_id, gid}); + CHECK(response) << "RemoteEdgeRpc failed"; + return std::move(response->name_output_); +} + +template <> +std::unique_ptr RemoteDataRpcClients::RemoteElement( + int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { + auto response = clients_.GetClientPool(worker_id).Call( + TxGidPair{tx_id, gid}); + CHECK(response) << "RemoteVertexRpc failed"; + return std::move(response->name_output_); +} + +} // namespace distributed diff --git a/src/distributed/remote_data_rpc_clients.hpp b/src/distributed/remote_data_rpc_clients.hpp index 675020119..8fe794d95 100644 --- a/src/distributed/remote_data_rpc_clients.hpp +++ b/src/distributed/remote_data_rpc_clients.hpp @@ -3,42 +3,19 @@ #include #include -#include "distributed/coordination.hpp" -#include "distributed/remote_data_rpc_messages.hpp" #include "distributed/rpc_worker_clients.hpp" #include "storage/gid.hpp" #include "transactions/type.hpp" namespace distributed { -/** Provides access to other worker's data. */ +/// Provides access to other worker's data. class RemoteDataRpcClients { public: RemoteDataRpcClients(RpcWorkerClients &clients) : clients_(clients) {} - - /// Returns a remote worker's data for the given params. That worker must own - /// the vertex for the given id, and that vertex must be visible in given - /// transaction. - std::unique_ptr RemoteVertex(int worker_id, - tx::transaction_id_t tx_id, - gid::Gid gid) { - auto response = clients_.GetClientPool(worker_id).Call( - TxGidPair{tx_id, gid}); - CHECK(response) << "RemoteVertexRpc failed"; - return std::move(response->name_output_); - } - - /// Returns a remote worker's data for the given params. That worker must own - /// the edge for the given id, and that edge must be visible in given - /// transaction. - std::unique_ptr RemoteEdge(int worker_id, tx::transaction_id_t tx_id, - gid::Gid gid) { - auto response = clients_.GetClientPool(worker_id).Call( - TxGidPair{tx_id, gid}); - CHECK(response) << "RemoteEdgeRpc failed"; - return std::move(response->name_output_); - } - + /// Returns a remote worker's record (vertex/edge) data for the given params. + /// That worker must own the vertex/edge for the given id, and that vertex + /// must be visible in given transaction. template std::unique_ptr RemoteElement(int worker_id, tx::transaction_id_t tx_id, @@ -48,16 +25,4 @@ class RemoteDataRpcClients { RpcWorkerClients &clients_; }; -template <> -inline std::unique_ptr RemoteDataRpcClients::RemoteElement( - int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { - return RemoteEdge(worker_id, tx_id, gid); -} - -template <> -inline std::unique_ptr RemoteDataRpcClients::RemoteElement( - int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { - return RemoteVertex(worker_id, tx_id, gid); -} - } // namespace distributed diff --git a/src/distributed/remote_data_rpc_server.cpp b/src/distributed/remote_data_rpc_server.cpp new file mode 100644 index 000000000..b33c0a13f --- /dev/null +++ b/src/distributed/remote_data_rpc_server.cpp @@ -0,0 +1,28 @@ +#include + +#include "database/graph_db_accessor.hpp" +#include "distributed/remote_data_rpc_messages.hpp" +#include "remote_data_rpc_server.hpp" + +namespace distributed { + +RemoteDataRpcServer::RemoteDataRpcServer(database::GraphDb &db, + communication::rpc::Server &server) + : db_(db), rpc_server_(server) { + rpc_server_.Register([this](const RemoteVertexReq &req) { + database::GraphDbAccessor dba(db_, req.member.tx_id); + auto vertex = dba.FindVertexChecked(req.member.gid, false); + CHECK(vertex.GetOld()) + << "Old record must exist when sending vertex by RPC"; + return std::make_unique(vertex.GetOld(), db_.WorkerId()); + }); + + rpc_server_.Register([this](const RemoteEdgeReq &req) { + database::GraphDbAccessor dba(db_, req.member.tx_id); + auto edge = dba.FindEdgeChecked(req.member.gid, false); + CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC"; + return std::make_unique(edge.GetOld(), db_.WorkerId()); + }); +} + +} // namespace distributed diff --git a/src/distributed/remote_data_rpc_server.hpp b/src/distributed/remote_data_rpc_server.hpp index d64d640d3..c1496026b 100644 --- a/src/distributed/remote_data_rpc_server.hpp +++ b/src/distributed/remote_data_rpc_server.hpp @@ -1,35 +1,15 @@ #pragma once -#include - #include "communication/rpc/server.hpp" #include "database/graph_db.hpp" -#include "database/graph_db_accessor.hpp" -#include "distributed/remote_data_rpc_messages.hpp" -#include "transactions/type.hpp" namespace distributed { -/** Serves this worker's data to others. */ +/// Serves this worker's data to others. class RemoteDataRpcServer { public: - RemoteDataRpcServer(database::GraphDb &db, communication::rpc::Server &server) - : db_(db), rpc_server_(server) { - rpc_server_.Register([this](const RemoteVertexReq &req) { - database::GraphDbAccessor dba(db_, req.member.tx_id); - auto vertex = dba.FindVertexChecked(req.member.gid, false); - CHECK(vertex.GetOld()) - << "Old record must exist when sending vertex by RPC"; - return std::make_unique(vertex.GetOld(), db_.WorkerId()); - }); - - rpc_server_.Register([this](const RemoteEdgeReq &req) { - database::GraphDbAccessor dba(db_, req.member.tx_id); - auto edge = dba.FindEdgeChecked(req.member.gid, false); - CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC"; - return std::make_unique(edge.GetOld(), db_.WorkerId()); - }); - } + RemoteDataRpcServer(database::GraphDb &db, + communication::rpc::Server &server); private: database::GraphDb &db_; diff --git a/src/distributed/remote_produce_rpc_server.cpp b/src/distributed/remote_produce_rpc_server.cpp new file mode 100644 index 000000000..d96fa6c3a --- /dev/null +++ b/src/distributed/remote_produce_rpc_server.cpp @@ -0,0 +1,170 @@ +#include "distributed/remote_produce_rpc_server.hpp" +#include "distributed/remote_data_manager.hpp" +#include "distributed/remote_pull_produce_rpc_messages.hpp" +#include "query/common.hpp" +#include "query/exceptions.hpp" +#include "transactions/engine_worker.hpp" + +namespace distributed { + +RemoteProduceRpcServer::OngoingProduce::OngoingProduce( + database::GraphDb &db, tx::transaction_id_t tx_id, + std::shared_ptr op, + query::SymbolTable symbol_table, Parameters parameters, + std::vector pull_symbols) + : dba_{db, tx_id}, + cursor_(op->MakeCursor(dba_)), + context_(dba_), + pull_symbols_(std::move(pull_symbols)), + frame_(symbol_table.max_position()) { + context_.symbol_table_ = std::move(symbol_table); + context_.parameters_ = std::move(parameters); +} + +std::pair, RemotePullState> +RemoteProduceRpcServer::OngoingProduce::Pull() { + if (!accumulation_.empty()) { + auto results = std::move(accumulation_.back()); + accumulation_.pop_back(); + for (auto &element : results) { + try { + query::ReconstructTypedValue(element); + } catch (query::ReconstructionException &) { + cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; + return std::make_pair(std::move(results), cursor_state_); + } + } + + return std::make_pair(std::move(results), + RemotePullState::CURSOR_IN_PROGRESS); + } + + return PullOneFromCursor(); +} + +RemotePullState RemoteProduceRpcServer::OngoingProduce::Accumulate() { + while (true) { + auto result = PullOneFromCursor(); + if (result.second != RemotePullState::CURSOR_IN_PROGRESS) + return result.second; + else + accumulation_.emplace_back(std::move(result.first)); + } +} + +std::pair, RemotePullState> +RemoteProduceRpcServer::OngoingProduce::PullOneFromCursor() { + std::vector results; + + // Check if we already exhausted this cursor (or it entered an error + // state). This happens when we accumulate before normal pull. + if (cursor_state_ != RemotePullState::CURSOR_IN_PROGRESS) { + return std::make_pair(results, cursor_state_); + } + + try { + if (cursor_->Pull(frame_, context_)) { + results.reserve(pull_symbols_.size()); + for (const auto &symbol : pull_symbols_) { + results.emplace_back(std::move(frame_[symbol])); + } + } else { + cursor_state_ = RemotePullState::CURSOR_EXHAUSTED; + } + } catch (const mvcc::SerializationError &) { + cursor_state_ = RemotePullState::SERIALIZATION_ERROR; + } catch (const LockTimeoutException &) { + cursor_state_ = RemotePullState::LOCK_TIMEOUT_ERROR; + } catch (const RecordDeletedError &) { + cursor_state_ = RemotePullState::UPDATE_DELETED_ERROR; + } catch (const query::ReconstructionException &) { + cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; + } catch (const query::RemoveAttachedVertexException &) { + cursor_state_ = RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR; + } catch (const query::QueryRuntimeException &) { + cursor_state_ = RemotePullState::QUERY_ERROR; + } catch (const query::HintedAbortError &) { + cursor_state_ = RemotePullState::HINTED_ABORT_ERROR; + } + return std::make_pair(std::move(results), cursor_state_); +} + +RemoteProduceRpcServer::RemoteProduceRpcServer( + database::GraphDb &db, tx::Engine &tx_engine, + communication::rpc::Server &server, + const distributed::PlanConsumer &plan_consumer) + : db_(db), + remote_produce_rpc_server_(server), + plan_consumer_(plan_consumer), + tx_engine_(tx_engine) { + remote_produce_rpc_server_.Register( + [this](const RemotePullReq &req) { + return std::make_unique(RemotePull(req)); + }); + + remote_produce_rpc_server_.Register( + [this](const TransactionCommandAdvancedReq &req) { + tx_engine_.UpdateCommand(req.member); + db_.remote_data_manager().ClearCacheForSingleTransaction(req.member); + return std::make_unique(); + }); +} + +void RemoteProduceRpcServer::ClearTransactionalCache( + tx::transaction_id_t oldest_active) { + auto access = ongoing_produces_.access(); + for (auto &kv : access) { + if (kv.first.first < oldest_active) { + access.remove(kv.first); + } + } +} + +RemoteProduceRpcServer::OngoingProduce & +RemoteProduceRpcServer::GetOngoingProduce(const RemotePullReq &req) { + auto access = ongoing_produces_.access(); + auto key_pair = std::make_pair(req.tx_id, req.plan_id); + auto found = access.find(key_pair); + if (found != access.end()) { + return found->second; + } + if (db_.type() == database::GraphDb::Type::DISTRIBUTED_WORKER) { + // On the worker cache the snapshot to have one RPC less. + dynamic_cast(tx_engine_) + .RunningTransaction(req.tx_id, req.tx_snapshot); + } + auto &plan_pack = plan_consumer_.PlanForId(req.plan_id); + return access + .emplace(key_pair, std::forward_as_tuple(key_pair), + std::forward_as_tuple(db_, req.tx_id, plan_pack.plan, + plan_pack.symbol_table, req.params, + req.symbols)) + .first->second; +} + +RemotePullResData RemoteProduceRpcServer::RemotePull(const RemotePullReq &req) { + auto &ongoing_produce = GetOngoingProduce(req); + + RemotePullResData result{db_.WorkerId(), req.send_old, req.send_new}; + result.state_and_frames.pull_state = RemotePullState::CURSOR_IN_PROGRESS; + + if (req.accumulate) { + result.state_and_frames.pull_state = ongoing_produce.Accumulate(); + // If an error ocurred, we need to return that error. + if (result.state_and_frames.pull_state != + RemotePullState::CURSOR_EXHAUSTED) { + return result; + } + } + + for (int i = 0; i < req.batch_size; ++i) { + auto pull_result = ongoing_produce.Pull(); + result.state_and_frames.pull_state = pull_result.second; + if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS) break; + result.state_and_frames.frames.emplace_back(std::move(pull_result.first)); + } + + return result; +} + +} // namespace distributed diff --git a/src/distributed/remote_produce_rpc_server.hpp b/src/distributed/remote_produce_rpc_server.hpp index f4d9b5f3e..49189e413 100644 --- a/src/distributed/remote_produce_rpc_server.hpp +++ b/src/distributed/remote_produce_rpc_server.hpp @@ -9,28 +9,21 @@ #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" #include "distributed/plan_consumer.hpp" -#include "distributed/remote_data_manager.hpp" -#include "distributed/remote_pull_produce_rpc_messages.hpp" -#include "query/common.hpp" #include "query/context.hpp" -#include "query/exceptions.hpp" #include "query/frontend/semantic/symbol_table.hpp" #include "query/interpret/frame.hpp" #include "query/parameters.hpp" #include "query/plan/operator.hpp" #include "query/typed_value.hpp" #include "transactions/engine.hpp" -#include "transactions/engine_worker.hpp" #include "transactions/type.hpp" namespace distributed { -/** - * Handles the execution of a plan on the worker, requested by the remote - * master. Assumes that (tx_id, plan_id) uniquely identifies an execution, and - * that there will never be parallel requests for the same execution thus - * identified. - */ +/// Handles the execution of a plan on the worker, requested by the remote +/// master. Assumes that (tx_id, plan_id) uniquely identifies an execution, and +/// that there will never be parallel requests for the same execution thus +/// identified. class RemoteProduceRpcServer { /// Encapsulates a Cursor execution in progress. Can be used for pulling a /// single result from the execution, or pulling all and accumulating the @@ -41,50 +34,16 @@ class RemoteProduceRpcServer { OngoingProduce(database::GraphDb &db, tx::transaction_id_t tx_id, std::shared_ptr op, query::SymbolTable symbol_table, Parameters parameters, - std::vector pull_symbols) - : dba_{db, tx_id}, - cursor_(op->MakeCursor(dba_)), - context_(dba_), - pull_symbols_(std::move(pull_symbols)), - frame_(symbol_table.max_position()) { - context_.symbol_table_ = std::move(symbol_table); - context_.parameters_ = std::move(parameters); - } + std::vector pull_symbols); /// Returns a vector of typed values (one for each `pull_symbol`), and an /// indication of the pull result. The result data is valid only if the /// returned state is CURSOR_IN_PROGRESS. - std::pair, RemotePullState> Pull() { - if (!accumulation_.empty()) { - auto results = std::move(accumulation_.back()); - accumulation_.pop_back(); - for (auto &element : results) { - try { - query::ReconstructTypedValue(element); - } catch (query::ReconstructionException &) { - cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; - return std::make_pair(std::move(results), cursor_state_); - } - } - - return std::make_pair(std::move(results), - RemotePullState::CURSOR_IN_PROGRESS); - } - - return PullOneFromCursor(); - } + std::pair, RemotePullState> Pull(); /// Accumulates all the frames pulled from the cursor and returns /// CURSOR_EXHAUSTED. If an error occurs, an appropriate value is returned. - RemotePullState Accumulate() { - while (true) { - auto result = PullOneFromCursor(); - if (result.second != RemotePullState::CURSOR_IN_PROGRESS) - return result.second; - else - accumulation_.emplace_back(std::move(result.first)); - } - } + RemotePullState Accumulate(); private: database::GraphDbAccessor dba_; @@ -95,75 +54,19 @@ class RemoteProduceRpcServer { RemotePullState cursor_state_{RemotePullState::CURSOR_IN_PROGRESS}; std::vector> accumulation_; + /// Pulls and returns a single result from the cursor. std::pair, RemotePullState> - PullOneFromCursor() { - std::vector results; - - // Check if we already exhausted this cursor (or it entered an error - // state). This happens when we accumulate before normal pull. - if (cursor_state_ != RemotePullState::CURSOR_IN_PROGRESS) { - return std::make_pair(results, cursor_state_); - } - - try { - if (cursor_->Pull(frame_, context_)) { - results.reserve(pull_symbols_.size()); - for (const auto &symbol : pull_symbols_) { - results.emplace_back(std::move(frame_[symbol])); - } - } else { - cursor_state_ = RemotePullState::CURSOR_EXHAUSTED; - } - } catch (const mvcc::SerializationError &) { - cursor_state_ = RemotePullState::SERIALIZATION_ERROR; - } catch (const LockTimeoutException &) { - cursor_state_ = RemotePullState::LOCK_TIMEOUT_ERROR; - } catch (const RecordDeletedError &) { - cursor_state_ = RemotePullState::UPDATE_DELETED_ERROR; - } catch (const query::ReconstructionException &) { - cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; - } catch (const query::RemoveAttachedVertexException &) { - cursor_state_ = RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR; - } catch (const query::QueryRuntimeException &) { - cursor_state_ = RemotePullState::QUERY_ERROR; - } catch (const query::HintedAbortError &) { - cursor_state_ = RemotePullState::HINTED_ABORT_ERROR; - } - return std::make_pair(std::move(results), cursor_state_); - } + PullOneFromCursor(); }; public: RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &tx_engine, communication::rpc::Server &server, - const distributed::PlanConsumer &plan_consumer) - : db_(db), - remote_produce_rpc_server_(server), - plan_consumer_(plan_consumer), - tx_engine_(tx_engine) { - remote_produce_rpc_server_.Register( - [this](const RemotePullReq &req) { - return std::make_unique(RemotePull(req)); - }); - - remote_produce_rpc_server_.Register( - [this](const TransactionCommandAdvancedReq &req) { - tx_engine_.UpdateCommand(req.member); - db_.remote_data_manager().ClearCacheForSingleTransaction(req.member); - return std::make_unique(); - }); - } + const distributed::PlanConsumer &plan_consumer); /// Clears the cache of local transactions that have expired. The signature of /// this method is dictated by `distributed::TransactionalCacheCleaner`. - void ClearTransactionalCache(tx::transaction_id_t oldest_active) { - auto access = ongoing_produces_.access(); - for (auto &kv : access) { - if (kv.first.first < oldest_active) { - access.remove(kv.first); - } - } - } + void ClearTransactionalCache(tx::transaction_id_t oldest_active); private: database::GraphDb &db_; @@ -173,50 +76,12 @@ class RemoteProduceRpcServer { ongoing_produces_; tx::Engine &tx_engine_; - auto &GetOngoingProduce(const RemotePullReq &req) { - auto access = ongoing_produces_.access(); - auto key_pair = std::make_pair(req.tx_id, req.plan_id); - auto found = access.find(key_pair); - if (found != access.end()) { - return found->second; - } - if (db_.type() == database::GraphDb::Type::DISTRIBUTED_WORKER) { - // On the worker cache the snapshot to have one RPC less. - dynamic_cast(tx_engine_) - .RunningTransaction(req.tx_id, req.tx_snapshot); - } - auto &plan_pack = plan_consumer_.PlanForId(req.plan_id); - return access - .emplace(key_pair, std::forward_as_tuple(key_pair), - std::forward_as_tuple(db_, req.tx_id, plan_pack.plan, - plan_pack.symbol_table, req.params, - req.symbols)) - .first->second; - } + /// Gets an ongoing produce for the given pull request. Creates a new one if + /// there is none currently existing. + OngoingProduce &GetOngoingProduce(const RemotePullReq &req); - RemotePullResData RemotePull(const RemotePullReq &req) { - auto &ongoing_produce = GetOngoingProduce(req); - - RemotePullResData result{db_.WorkerId(), req.send_old, req.send_new}; - result.state_and_frames.pull_state = RemotePullState::CURSOR_IN_PROGRESS; - - if (req.accumulate) { - result.state_and_frames.pull_state = ongoing_produce.Accumulate(); - // If an error ocurred, we need to return that error. - if (result.state_and_frames.pull_state != - RemotePullState::CURSOR_EXHAUSTED) { - return result; - } - } - - for (int i = 0; i < req.batch_size; ++i) { - auto pull_result = ongoing_produce.Pull(); - result.state_and_frames.pull_state = pull_result.second; - if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS) break; - result.state_and_frames.frames.emplace_back(std::move(pull_result.first)); - } - - return result; - } + /// Performs a single remote pull for the given request. + RemotePullResData RemotePull(const RemotePullReq &req); }; + } // namespace distributed diff --git a/src/distributed/remote_pull_rpc_clients.cpp b/src/distributed/remote_pull_rpc_clients.cpp new file mode 100644 index 000000000..0361d851e --- /dev/null +++ b/src/distributed/remote_pull_rpc_clients.cpp @@ -0,0 +1,72 @@ +#include + +#include "distributed/remote_data_manager.hpp" +#include "distributed/remote_pull_rpc_clients.hpp" +#include "storage/edge.hpp" +#include "storage/vertex.hpp" + +namespace distributed { + +utils::Future RemotePullRpcClients::RemotePull( + database::GraphDbAccessor &dba, int worker_id, int64_t plan_id, + const Parameters ¶ms, const std::vector &symbols, + bool accumulate, int batch_size) { + return clients_.ExecuteOnWorker( + worker_id, [&dba, plan_id, params, symbols, accumulate, + batch_size](ClientPool &client_pool) { + auto result = client_pool.Call( + dba.transaction_id(), dba.transaction().snapshot(), plan_id, params, + symbols, accumulate, batch_size, true, true); + + auto handle_vertex = [&dba](auto &v) { + dba.db() + .remote_data_manager() + .Elements(dba.transaction_id()) + .emplace(v.global_address.gid(), std::move(v.old_record), + std::move(v.new_record)); + if (v.element_in_frame) { + VertexAccessor va(v.global_address, dba); + *v.element_in_frame = va; + } + }; + auto handle_edge = [&dba](auto &e) { + dba.db() + .remote_data_manager() + .Elements(dba.transaction_id()) + .emplace(e.global_address.gid(), std::move(e.old_record), + std::move(e.new_record)); + if (e.element_in_frame) { + EdgeAccessor ea(e.global_address, dba); + *e.element_in_frame = ea; + } + }; + for (auto &v : result->data.vertices) handle_vertex(v); + for (auto &e : result->data.edges) handle_edge(e); + for (auto &p : result->data.paths) { + handle_vertex(p.vertices[0]); + p.path_in_frame = + query::Path(VertexAccessor(p.vertices[0].global_address, dba)); + query::Path &path_in_frame = p.path_in_frame.ValuePath(); + for (size_t i = 0; i < p.edges.size(); ++i) { + handle_edge(p.edges[i]); + path_in_frame.Expand(EdgeAccessor(p.edges[i].global_address, dba)); + handle_vertex(p.vertices[i + 1]); + path_in_frame.Expand( + VertexAccessor(p.vertices[i + 1].global_address, dba)); + } + } + + return std::move(result->data.state_and_frames); + }); +} + +std::vector> +RemotePullRpcClients::NotifyAllTransactionCommandAdvanced( + tx::transaction_id_t tx_id) { + return clients_.ExecuteOnWorkers(0, [tx_id](auto &client) { + auto res = client.template Call(tx_id); + CHECK(res) << "TransactionCommandAdvanceRpc failed"; + }); +} + +} // namespace distributed diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/remote_pull_rpc_clients.hpp index aa7eb99d9..d00d0db9e 100644 --- a/src/distributed/remote_pull_rpc_clients.hpp +++ b/src/distributed/remote_pull_rpc_clients.hpp @@ -1,10 +1,8 @@ #pragma once -#include #include #include "database/graph_db_accessor.hpp" -#include "distributed/remote_data_manager.hpp" #include "distributed/remote_pull_produce_rpc_messages.hpp" #include "distributed/rpc_worker_clients.hpp" #include "query/frontend/semantic/symbol.hpp" @@ -14,10 +12,10 @@ namespace distributed { -/** Provides means of calling for the execution of a plan on some remote worker, - * and getting the results of that execution. The results are returned in - * batches and are therefore accompanied with an enum indicator of the state of - * remote execution. */ +/// Provides means of calling for the execution of a plan on some remote worker, +/// and getting the results of that execution. The results are returned in +/// batches and are therefore accompanied with an enum indicator of the state of +/// remote execution. class RemotePullRpcClients { using ClientPool = communication::rpc::ClientPool; @@ -34,68 +32,15 @@ class RemotePullRpcClients { utils::Future RemotePull( database::GraphDbAccessor &dba, int worker_id, int64_t plan_id, const Parameters ¶ms, const std::vector &symbols, - bool accumulate, int batch_size = kDefaultBatchSize) { - return clients_.ExecuteOnWorker( - worker_id, [&dba, plan_id, params, symbols, accumulate, - batch_size](ClientPool &client_pool) { - auto result = client_pool.Call( - dba.transaction_id(), dba.transaction().snapshot(), plan_id, - params, symbols, accumulate, batch_size, true, true); - - auto handle_vertex = [&dba](auto &v) { - dba.db() - .remote_data_manager() - .Vertices(dba.transaction_id()) - .emplace(v.global_address.gid(), std::move(v.old_record), - std::move(v.new_record)); - if (v.element_in_frame) { - VertexAccessor va(v.global_address, dba); - *v.element_in_frame = va; - } - }; - auto handle_edge = [&dba](auto &e) { - dba.db() - .remote_data_manager() - .Edges(dba.transaction_id()) - .emplace(e.global_address.gid(), std::move(e.old_record), - std::move(e.new_record)); - if (e.element_in_frame) { - EdgeAccessor ea(e.global_address, dba); - *e.element_in_frame = ea; - } - }; - for (auto &v : result->data.vertices) handle_vertex(v); - for (auto &e : result->data.edges) handle_edge(e); - for (auto &p : result->data.paths) { - handle_vertex(p.vertices[0]); - p.path_in_frame = - query::Path(VertexAccessor(p.vertices[0].global_address, dba)); - query::Path &path_in_frame = p.path_in_frame.ValuePath(); - for (size_t i = 0; i < p.edges.size(); ++i) { - handle_edge(p.edges[i]); - path_in_frame.Expand( - EdgeAccessor(p.edges[i].global_address, dba)); - handle_vertex(p.vertices[i + 1]); - path_in_frame.Expand( - VertexAccessor(p.vertices[i + 1].global_address, dba)); - } - } - - return std::move(result->data.state_and_frames); - }); - } + bool accumulate, int batch_size = kDefaultBatchSize); auto GetWorkerIds() { return clients_.GetWorkerIds(); } std::vector> NotifyAllTransactionCommandAdvanced( - tx::transaction_id_t tx_id) { - return clients_.ExecuteOnWorkers(0, [tx_id](auto &client) { - auto res = client.template Call(tx_id); - CHECK(res) << "TransactionCommandAdvanceRpc failed"; - }); - } + tx::transaction_id_t tx_id); private: RpcWorkerClients &clients_; }; + } // namespace distributed diff --git a/src/distributed/remote_updates_rpc_clients.cpp b/src/distributed/remote_updates_rpc_clients.cpp new file mode 100644 index 000000000..13181b30c --- /dev/null +++ b/src/distributed/remote_updates_rpc_clients.cpp @@ -0,0 +1,125 @@ + +#include +#include + +#include "distributed/remote_updates_rpc_clients.hpp" +#include "query/exceptions.hpp" + +namespace distributed { + +namespace { +void RaiseIfRemoteError(RemoteUpdateResult result) { + switch (result) { + case RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: + throw query::RemoveAttachedVertexException(); + case RemoteUpdateResult::SERIALIZATION_ERROR: + throw mvcc::SerializationError(); + case RemoteUpdateResult::LOCK_TIMEOUT_ERROR: + throw LockTimeoutException( + "Remote LockTimeoutError during edge creation"); + case RemoteUpdateResult::UPDATE_DELETED_ERROR: + throw RecordDeletedError(); + case RemoteUpdateResult::DONE: + break; + } +} +} + +RemoteUpdateResult RemoteUpdatesRpcClients::RemoteUpdate( + int worker_id, const database::StateDelta &delta) { + auto res = + worker_clients_.GetClientPool(worker_id).Call(delta); + CHECK(res) << "RemoteUpdateRpc failed on worker: " << worker_id; + return res->member; +} + +gid::Gid RemoteUpdatesRpcClients::RemoteCreateVertex( + int worker_id, tx::transaction_id_t tx_id, + const std::vector &labels, + const std::unordered_map + &properties) { + auto res = + worker_clients_.GetClientPool(worker_id).Call( + RemoteCreateVertexReqData{tx_id, labels, properties}); + CHECK(res) << "RemoteCreateVertexRpc failed on worker: " << worker_id; + CHECK(res->member.result == RemoteUpdateResult::DONE) + << "Remote Vertex creation result not RemoteUpdateResult::DONE"; + return res->member.gid; +} + +storage::EdgeAddress RemoteUpdatesRpcClients::RemoteCreateEdge( + tx::transaction_id_t tx_id, VertexAccessor &from, VertexAccessor &to, + storage::EdgeType edge_type) { + CHECK(from.address().is_remote()) + << "In RemoteCreateEdge `from` must be remote"; + + int from_worker = from.address().worker_id(); + auto res = worker_clients_.GetClientPool(from_worker) + .Call(RemoteCreateEdgeReqData{ + from.gid(), to.GlobalAddress(), edge_type, tx_id}); + CHECK(res) << "RemoteCreateEdge RPC failed on worker: " << from_worker; + RaiseIfRemoteError(res->member.result); + return {res->member.gid, from_worker}; +} + +void RemoteUpdatesRpcClients::RemoteAddInEdge(tx::transaction_id_t tx_id, + VertexAccessor &from, + storage::EdgeAddress edge_address, + VertexAccessor &to, + storage::EdgeType edge_type) { + CHECK(to.address().is_remote() && edge_address.is_remote() && + (from.GlobalAddress().worker_id() != to.address().worker_id())) + << "RemoteAddInEdge should only be called when `to` is remote and " + "`from` is not on the same worker as `to`."; + auto worker_id = to.GlobalAddress().worker_id(); + auto res = worker_clients_.GetClientPool(worker_id).Call( + RemoteAddInEdgeReqData{from.GlobalAddress(), edge_address, to.gid(), + edge_type, tx_id}); + CHECK(res) << "RemoteAddInEdge RPC failed on worker: " << worker_id; + RaiseIfRemoteError(res->member); +} + +void RemoteUpdatesRpcClients::RemoteRemoveVertex(int worker_id, + tx::transaction_id_t tx_id, + gid::Gid gid, + bool check_empty) { + auto res = + worker_clients_.GetClientPool(worker_id).Call( + RemoteRemoveVertexReqData{gid, tx_id, check_empty}); + CHECK(res) << "RemoteRemoveVertex RPC failed on worker: " << worker_id; + RaiseIfRemoteError(res->member); +} + +void RemoteUpdatesRpcClients::RemoteRemoveEdge( + tx::transaction_id_t tx_id, int worker_id, gid::Gid edge_gid, + gid::Gid vertex_from_id, storage::VertexAddress vertex_to_addr) { + auto res = worker_clients_.GetClientPool(worker_id).Call( + RemoteRemoveEdgeData{tx_id, edge_gid, vertex_from_id, vertex_to_addr}); + CHECK(res) << "RemoteRemoveEdge RPC failed on worker: " << worker_id; + RaiseIfRemoteError(res->member); +} + +void RemoteUpdatesRpcClients::RemoteRemoveInEdge( + tx::transaction_id_t tx_id, int worker_id, gid::Gid vertex_id, + storage::EdgeAddress edge_address) { + CHECK(edge_address.is_remote()) + << "RemoteRemoveInEdge edge_address is local."; + auto res = + worker_clients_.GetClientPool(worker_id).Call( + RemoteRemoveInEdgeData{tx_id, vertex_id, edge_address}); + CHECK(res) << "RemoteRemoveInEdge RPC failed on worker: " << worker_id; + RaiseIfRemoteError(res->member); +} + +std::vector> +RemoteUpdatesRpcClients::RemoteUpdateApplyAll(int skip_worker_id, + tx::transaction_id_t tx_id) { + return worker_clients_.ExecuteOnWorkers( + skip_worker_id, [tx_id](auto &client) { + auto res = client.template Call(tx_id); + CHECK(res) << "RemoteUpdateApplyRpc failed"; + return res->member; + }); +} + +} // namespace distributed diff --git a/src/distributed/remote_updates_rpc_clients.hpp b/src/distributed/remote_updates_rpc_clients.hpp index ba4e71cf1..b5f95c3b5 100644 --- a/src/distributed/remote_updates_rpc_clients.hpp +++ b/src/distributed/remote_updates_rpc_clients.hpp @@ -6,7 +6,6 @@ #include "database/state_delta.hpp" #include "distributed/remote_updates_rpc_messages.hpp" #include "distributed/rpc_worker_clients.hpp" -#include "query/exceptions.hpp" #include "query/typed_value.hpp" #include "storage/address_types.hpp" #include "storage/gid.hpp" @@ -26,27 +25,14 @@ class RemoteUpdatesRpcClients { /// Sends an update delta to the given worker. RemoteUpdateResult RemoteUpdate(int worker_id, - const database::StateDelta &delta) { - auto res = - worker_clients_.GetClientPool(worker_id).Call(delta); - CHECK(res) << "RemoteUpdateRpc failed on worker: " << worker_id; - return res->member; - } + const database::StateDelta &delta); /// Creates a vertex on the given worker and returns it's id. gid::Gid RemoteCreateVertex( int worker_id, tx::transaction_id_t tx_id, const std::vector &labels, const std::unordered_map - &properties) { - auto res = - worker_clients_.GetClientPool(worker_id).Call( - RemoteCreateVertexReqData{tx_id, labels, properties}); - CHECK(res) << "RemoteCreateVertexRpc failed on worker: " << worker_id; - CHECK(res->member.result == RemoteUpdateResult::DONE) - << "Remote Vertex creation result not RemoteUpdateResult::DONE"; - return res->member.gid; - } + &properties); /// Creates an edge on the given worker and returns it's address. If the `to` /// vertex is on the same worker as `from`, then all remote CRUD will be @@ -56,45 +42,17 @@ class RemoteUpdatesRpcClients { storage::EdgeAddress RemoteCreateEdge(tx::transaction_id_t tx_id, VertexAccessor &from, VertexAccessor &to, - storage::EdgeType edge_type) { - CHECK(from.address().is_remote()) - << "In RemoteCreateEdge `from` must be remote"; - - int from_worker = from.address().worker_id(); - auto res = worker_clients_.GetClientPool(from_worker) - .Call(RemoteCreateEdgeReqData{ - from.gid(), to.GlobalAddress(), edge_type, tx_id}); - CHECK(res) << "RemoteCreateEdge RPC failed on worker: " << from_worker; - RaiseIfRemoteError(res->member.result); - return {res->member.gid, from_worker}; - } + storage::EdgeType edge_type); /// Adds the edge with the given address to the `to` vertex as an incoming /// edge. Only used when `to` is remote and not on the same worker as `from`. void RemoteAddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from, storage::EdgeAddress edge_address, VertexAccessor &to, - storage::EdgeType edge_type) { - CHECK(to.address().is_remote() && edge_address.is_remote() && - (from.GlobalAddress().worker_id() != to.address().worker_id())) - << "RemoteAddInEdge should only be called when `to` is remote and " - "`from` is not on the same worker as `to`."; - auto worker_id = to.GlobalAddress().worker_id(); - auto res = - worker_clients_.GetClientPool(worker_id).Call( - RemoteAddInEdgeReqData{from.GlobalAddress(), edge_address, to.gid(), - edge_type, tx_id}); - CHECK(res) << "RemoteAddInEdge RPC failed on worker: " << worker_id; - RaiseIfRemoteError(res->member); - } + storage::EdgeType edge_type); + /// Removes a vertex from the other worker. void RemoteRemoveVertex(int worker_id, tx::transaction_id_t tx_id, - gid::Gid gid, bool check_empty) { - auto res = - worker_clients_.GetClientPool(worker_id).Call( - RemoteRemoveVertexReqData{gid, tx_id, check_empty}); - CHECK(res) << "RemoteRemoveVertex RPC failed on worker: " << worker_id; - RaiseIfRemoteError(res->member); - } + gid::Gid gid, bool check_empty); /// Removes an edge on another worker. This also handles the `from` vertex /// outgoing edge, as that vertex is on the same worker as the edge. If the @@ -103,56 +61,19 @@ class RemoteUpdatesRpcClients { /// RemoteRemoveInEdge. void RemoteRemoveEdge(tx::transaction_id_t tx_id, int worker_id, gid::Gid edge_gid, gid::Gid vertex_from_id, - storage::VertexAddress vertex_to_addr) { - auto res = - worker_clients_.GetClientPool(worker_id).Call( - RemoteRemoveEdgeData{tx_id, edge_gid, vertex_from_id, - vertex_to_addr}); - CHECK(res) << "RemoteRemoveEdge RPC failed on worker: " << worker_id; - RaiseIfRemoteError(res->member); - } + storage::VertexAddress vertex_to_addr); void RemoteRemoveInEdge(tx::transaction_id_t tx_id, int worker_id, gid::Gid vertex_id, - storage::EdgeAddress edge_address) { - CHECK(edge_address.is_remote()) - << "RemoteRemoveInEdge edge_address is local."; - auto res = - worker_clients_.GetClientPool(worker_id).Call( - RemoteRemoveInEdgeData{tx_id, vertex_id, edge_address}); - CHECK(res) << "RemoteRemoveInEdge RPC failed on worker: " << worker_id; - RaiseIfRemoteError(res->member); - } + storage::EdgeAddress edge_address); /// Calls for all the workers (except the given one) to apply their updates /// and returns the future results. std::vector> RemoteUpdateApplyAll( - int skip_worker_id, tx::transaction_id_t tx_id) { - return worker_clients_.ExecuteOnWorkers( - skip_worker_id, [tx_id](auto &client) { - auto res = client.template Call(tx_id); - CHECK(res) << "RemoteUpdateApplyRpc failed"; - return res->member; - }); - } + int skip_worker_id, tx::transaction_id_t tx_id); private: RpcWorkerClients &worker_clients_; - - void RaiseIfRemoteError(RemoteUpdateResult result) { - switch (result) { - case RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: - throw query::RemoveAttachedVertexException(); - case RemoteUpdateResult::SERIALIZATION_ERROR: - throw mvcc::SerializationError(); - case RemoteUpdateResult::LOCK_TIMEOUT_ERROR: - throw LockTimeoutException( - "Remote LockTimeoutError during edge creation"); - case RemoteUpdateResult::UPDATE_DELETED_ERROR: - throw RecordDeletedError(); - case RemoteUpdateResult::DONE: - break; - } - } }; + } // namespace distributed diff --git a/src/distributed/remote_updates_rpc_server.cpp b/src/distributed/remote_updates_rpc_server.cpp new file mode 100644 index 000000000..d4428bed3 --- /dev/null +++ b/src/distributed/remote_updates_rpc_server.cpp @@ -0,0 +1,362 @@ +#include + +#include "glog/logging.h" + +#include "distributed/remote_updates_rpc_server.hpp" +#include "threading/sync/lock_timeout_exception.hpp" + +namespace distributed { + +template +RemoteUpdateResult +RemoteUpdatesRpcServer::TransactionUpdates::Emplace( + const database::StateDelta &delta) { + auto gid = std::is_same::value + ? delta.vertex_id + : delta.edge_id; + std::lock_guard guard{lock_}; + auto found = deltas_.find(gid); + if (found == deltas_.end()) { + found = + deltas_ + .emplace(gid, std::make_pair(FindAccessor(gid), + std::vector{})) + .first; + } + + found->second.second.emplace_back(delta); + + // TODO call `RecordAccessor::update` to force serialization errors to + // fail-fast (as opposed to when all the deltas get applied). + // + // This is problematic because `VersionList::update` needs to become + // thread-safe within the same transaction. Note that the concurrency is + // possible both between the owner worker interpretation thread and an RPC + // thread (current thread), as well as multiple RPC threads if this + // object's lock is released (perhaps desirable). + // + // A potential solution *might* be that `LockStore::Lock` returns a `bool` + // indicating if the caller was the one obtaining the lock (not the same + // as lock already being held by the same transaction). + // + // Another thing that needs to be done (if we do this) is ensuring that + // `LockStore::Take` is thread-safe when called in parallel in the same + // transaction. Currently it's thread-safe only when called in parallel + // from different transactions (only one manages to take the RecordLock). + // + // Deferring the implementation of this as it's tricky, and essentially an + // optimization. + // + // try { + // found->second.first.update(); + // } catch (const mvcc::SerializationError &) { + // return RemoteUpdateResult::SERIALIZATION_ERROR; + // } catch (const RecordDeletedError &) { + // return RemoteUpdateResult::UPDATE_DELETED_ERROR; + // } catch (const LockTimeoutException &) { + // return RemoteUpdateResult::LOCK_TIMEOUT_ERROR; + // } + return RemoteUpdateResult::DONE; +} + +template +gid::Gid +RemoteUpdatesRpcServer::TransactionUpdates::CreateVertex( + const std::vector &labels, + const std::unordered_map + &properties) { + auto result = db_accessor_.InsertVertex(); + for (auto &label : labels) result.add_label(label); + for (auto &kv : properties) result.PropsSet(kv.first, kv.second); + std::lock_guard guard{lock_}; + deltas_.emplace(result.gid(), + std::make_pair(result, std::vector{})); + return result.gid(); +} + +template +gid::Gid +RemoteUpdatesRpcServer::TransactionUpdates::CreateEdge( + gid::Gid from, storage::VertexAddress to, storage::EdgeType edge_type) { + auto &db = db_accessor_.db(); + auto edge = db_accessor_.InsertOnlyEdge( + {from, db.WorkerId()}, db.storage().LocalizedAddressIfPossible(to), + edge_type); + std::lock_guard guard{lock_}; + deltas_.emplace(edge.gid(), + std::make_pair(edge, std::vector{})); + return edge.gid(); +} + +template +RemoteUpdateResult +RemoteUpdatesRpcServer::TransactionUpdates::Apply() { + std::lock_guard guard{lock_}; + for (auto &kv : deltas_) { + auto &record_accessor = kv.second.first; + // We need to reconstruct the record as in the meantime some local + // update might have updated it. + record_accessor.Reconstruct(); + for (database::StateDelta &delta : kv.second.second) { + try { + auto &dba = db_accessor_; + switch (delta.type) { + case database::StateDelta::Type::TRANSACTION_BEGIN: + case database::StateDelta::Type::TRANSACTION_COMMIT: + case database::StateDelta::Type::TRANSACTION_ABORT: + case database::StateDelta::Type::CREATE_VERTEX: + case database::StateDelta::Type::CREATE_EDGE: + case database::StateDelta::Type::BUILD_INDEX: + LOG(FATAL) << "Can only apply record update deltas for remote " + "graph element"; + case database::StateDelta::Type::REMOVE_VERTEX: + if (!db_accessor().RemoveVertex( + reinterpret_cast(record_accessor), + delta.check_empty)) { + return RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR; + } + break; + case database::StateDelta::Type::SET_PROPERTY_VERTEX: + case database::StateDelta::Type::SET_PROPERTY_EDGE: + record_accessor.PropsSet(delta.property, delta.value); + break; + case database::StateDelta::Type::ADD_LABEL: + reinterpret_cast(record_accessor) + .add_label(delta.label); + break; + case database::StateDelta::Type::REMOVE_LABEL: + reinterpret_cast(record_accessor) + .remove_label(delta.label); + break; + case database::StateDelta::Type::ADD_OUT_EDGE: + reinterpret_cast(record_accessor.update()) + .out_.emplace(dba.db().storage().LocalizedAddressIfPossible( + delta.vertex_to_address), + dba.db().storage().LocalizedAddressIfPossible( + delta.edge_address), + delta.edge_type); + dba.wal().Emplace(delta); + break; + case database::StateDelta::Type::ADD_IN_EDGE: + reinterpret_cast(record_accessor.update()) + .in_.emplace(dba.db().storage().LocalizedAddressIfPossible( + delta.vertex_from_address), + dba.db().storage().LocalizedAddressIfPossible( + delta.edge_address), + delta.edge_type); + dba.wal().Emplace(delta); + break; + case database::StateDelta::Type::REMOVE_EDGE: + // We only remove the edge as a result of this StateDelta, + // because the removal of edge from vertex in/out is performed + // in REMOVE_[IN/OUT]_EDGE deltas. + db_accessor_.RemoveEdge( + reinterpret_cast(record_accessor), false, + false); + break; + case database::StateDelta::Type::REMOVE_OUT_EDGE: + reinterpret_cast(record_accessor) + .RemoveOutEdge(delta.edge_address); + break; + case database::StateDelta::Type::REMOVE_IN_EDGE: + reinterpret_cast(record_accessor) + .RemoveInEdge(delta.edge_address); + break; + } + } catch (const mvcc::SerializationError &) { + return RemoteUpdateResult::SERIALIZATION_ERROR; + } catch (const RecordDeletedError &) { + return RemoteUpdateResult::UPDATE_DELETED_ERROR; + } catch (const LockTimeoutException &) { + return RemoteUpdateResult::LOCK_TIMEOUT_ERROR; + } + } + } + return RemoteUpdateResult::DONE; +} + +RemoteUpdatesRpcServer::RemoteUpdatesRpcServer( + database::GraphDb &db, communication::rpc::Server &server) + : db_(db) { + server.Register([this](const RemoteUpdateReq &req) { + using DeltaType = database::StateDelta::Type; + auto &delta = req.member; + switch (delta.type) { + case DeltaType::SET_PROPERTY_VERTEX: + case DeltaType::ADD_LABEL: + case DeltaType::REMOVE_LABEL: + case database::StateDelta::Type::REMOVE_OUT_EDGE: + case database::StateDelta::Type::REMOVE_IN_EDGE: + return std::make_unique( + GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta)); + case DeltaType::SET_PROPERTY_EDGE: + return std::make_unique( + GetUpdates(edge_updates_, delta.transaction_id).Emplace(delta)); + default: + LOG(FATAL) << "Can't perform a remote update with delta type: " + << static_cast(req.member.type); + } + }); + + server.Register( + [this](const RemoteUpdateApplyReq &req) { + return std::make_unique(Apply(req.member)); + }); + + server.Register([this]( + const RemoteCreateVertexReq &req) { + gid::Gid gid = GetUpdates(vertex_updates_, req.member.tx_id) + .CreateVertex(req.member.labels, req.member.properties); + return std::make_unique( + RemoteCreateResult{RemoteUpdateResult::DONE, gid}); + }); + + server.Register([this](const RemoteCreateEdgeReq &req) { + auto data = req.member; + auto creation_result = CreateEdge(data); + + // If `from` and `to` are both on this worker, we handle it in this + // RPC call. Do it only if CreateEdge succeeded. + if (creation_result.result == RemoteUpdateResult::DONE && + data.to.worker_id() == db_.WorkerId()) { + auto to_delta = database::StateDelta::AddInEdge( + data.tx_id, data.to.gid(), {data.from, db_.WorkerId()}, + {creation_result.gid, db_.WorkerId()}, data.edge_type); + creation_result.result = + GetUpdates(vertex_updates_, data.tx_id).Emplace(to_delta); + } + + return std::make_unique(creation_result); + }); + + server.Register([this](const RemoteAddInEdgeReq &req) { + auto to_delta = database::StateDelta::AddInEdge( + req.member.tx_id, req.member.to, req.member.from, + req.member.edge_address, req.member.edge_type); + auto result = + GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); + return std::make_unique(result); + }); + + server.Register( + [this](const RemoteRemoveVertexReq &req) { + auto to_delta = database::StateDelta::RemoveVertex( + req.member.tx_id, req.member.gid, req.member.check_empty); + auto result = + GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); + return std::make_unique(result); + }); + + server.Register([this](const RemoteRemoveEdgeReq &req) { + return std::make_unique(RemoveEdge(req.member)); + }); + + server.Register( + [this](const RemoteRemoveInEdgeReq &req) { + auto data = req.member; + return std::make_unique( + GetUpdates(vertex_updates_, data.tx_id) + .Emplace(database::StateDelta::RemoveInEdge( + data.tx_id, data.vertex, data.edge_address))); + }); +} + +RemoteUpdateResult RemoteUpdatesRpcServer::Apply(tx::transaction_id_t tx_id) { + auto apply = [tx_id](auto &collection) { + auto access = collection.access(); + auto found = access.find(tx_id); + if (found == access.end()) { + return RemoteUpdateResult::DONE; + } + auto result = found->second.Apply(); + access.remove(tx_id); + return result; + }; + + auto vertex_result = apply(vertex_updates_); + auto edge_result = apply(edge_updates_); + if (vertex_result != RemoteUpdateResult::DONE) return vertex_result; + if (edge_result != RemoteUpdateResult::DONE) return edge_result; + return RemoteUpdateResult::DONE; +} + +void RemoteUpdatesRpcServer::ClearTransactionalCache( + tx::transaction_id_t oldest_active) { + auto vertex_access = vertex_updates_.access(); + for (auto &kv : vertex_access) { + if (kv.first < oldest_active) { + vertex_access.remove(kv.first); + } + } + auto edge_access = edge_updates_.access(); + for (auto &kv : edge_access) { + if (kv.first < oldest_active) { + edge_access.remove(kv.first); + } + } +} + +// Gets/creates the TransactionUpdates for the given transaction. +template +RemoteUpdatesRpcServer::TransactionUpdates + &RemoteUpdatesRpcServer::GetUpdates(MapT &updates, + tx::transaction_id_t tx_id) { + return updates.access() + .emplace(tx_id, std::make_tuple(tx_id), + std::make_tuple(std::ref(db_), tx_id)) + .first->second; +} + +RemoteCreateResult RemoteUpdatesRpcServer::CreateEdge( + const RemoteCreateEdgeReqData &req) { + auto gid = GetUpdates(edge_updates_, req.tx_id) + .CreateEdge(req.from, req.to, req.edge_type); + + auto from_delta = database::StateDelta::AddOutEdge( + req.tx_id, req.from, req.to, {gid, db_.WorkerId()}, req.edge_type); + + auto result = GetUpdates(vertex_updates_, req.tx_id).Emplace(from_delta); + return {result, gid}; +} + +RemoteUpdateResult RemoteUpdatesRpcServer::RemoveEdge( + const RemoteRemoveEdgeData &data) { + // Edge removal. + auto deletion_delta = + database::StateDelta::RemoveEdge(data.tx_id, data.edge_id); + auto result = GetUpdates(edge_updates_, data.tx_id).Emplace(deletion_delta); + + // Out-edge removal, for sure is local. + if (result == RemoteUpdateResult::DONE) { + auto remove_out_delta = database::StateDelta::RemoveOutEdge( + data.tx_id, data.vertex_from_id, {data.edge_id, db_.WorkerId()}); + result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_out_delta); + } + + // In-edge removal, might not be local. + if (result == RemoteUpdateResult::DONE && + data.vertex_to_address.worker_id() == db_.WorkerId()) { + auto remove_in_delta = database::StateDelta::RemoveInEdge( + data.tx_id, data.vertex_to_address.gid(), + {data.edge_id, db_.WorkerId()}); + result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_in_delta); + } + + return result; +} + +template <> +VertexAccessor +RemoteUpdatesRpcServer::TransactionUpdates::FindAccessor( + gid::Gid gid) { + return db_accessor_.FindVertexChecked(gid, false); +} + +template <> +EdgeAccessor +RemoteUpdatesRpcServer::TransactionUpdates::FindAccessor( + gid::Gid gid) { + return db_accessor_.FindEdgeChecked(gid, false); +} + +} // namespace distributed diff --git a/src/distributed/remote_updates_rpc_server.hpp b/src/distributed/remote_updates_rpc_server.hpp index 686fd023f..ae8e8d765 100644 --- a/src/distributed/remote_updates_rpc_server.hpp +++ b/src/distributed/remote_updates_rpc_server.hpp @@ -1,8 +1,6 @@ #pragma once -#include #include -#include #include #include "glog/logging.h" @@ -13,13 +11,11 @@ #include "database/graph_db_accessor.hpp" #include "database/state_delta.hpp" #include "distributed/remote_updates_rpc_messages.hpp" -#include "mvcc/version_list.hpp" #include "query/typed_value.hpp" +#include "storage/edge_accessor.hpp" #include "storage/gid.hpp" -#include "storage/record_accessor.hpp" #include "storage/types.hpp" #include "storage/vertex_accessor.hpp" -#include "threading/sync/lock_timeout_exception.hpp" #include "threading/sync/spinlock.hpp" #include "transactions/type.hpp" @@ -42,170 +38,21 @@ class RemoteUpdatesRpcServer { /// Adds a delta and returns the result. Does not modify the state (data) of /// the graph element the update is for, but calls the `update` method to /// fail-fast on serialization and update-after-delete errors. - RemoteUpdateResult Emplace(const database::StateDelta &delta) { - auto gid = std::is_same::value - ? delta.vertex_id - : delta.edge_id; - std::lock_guard guard{lock_}; - auto found = deltas_.find(gid); - if (found == deltas_.end()) { - found = deltas_ - .emplace(gid, std::make_pair( - FindAccessor(gid), - std::vector{})) - .first; - } - - found->second.second.emplace_back(delta); - - // TODO call `RecordAccessor::update` to force serialization errors to - // fail-fast (as opposed to when all the deltas get applied). - // - // This is problematic because `VersionList::update` needs to become - // thread-safe within the same transaction. Note that the concurrency is - // possible both between the owner worker interpretation thread and an RPC - // thread (current thread), as well as multiple RPC threads if this - // object's lock is released (perhaps desirable). - // - // A potential solution *might* be that `LockStore::Lock` returns a `bool` - // indicating if the caller was the one obtaining the lock (not the same - // as lock already being held by the same transaction). - // - // Another thing that needs to be done (if we do this) is ensuring that - // `LockStore::Take` is thread-safe when called in parallel in the same - // transaction. Currently it's thread-safe only when called in parallel - // from different transactions (only one manages to take the RecordLock). - // - // Deferring the implementation of this as it's tricky, and essentially an - // optimization. - // - // try { - // found->second.first.update(); - // } catch (const mvcc::SerializationError &) { - // return RemoteUpdateResult::SERIALIZATION_ERROR; - // } catch (const RecordDeletedError &) { - // return RemoteUpdateResult::UPDATE_DELETED_ERROR; - // } catch (const LockTimeoutException &) { - // return RemoteUpdateResult::LOCK_TIMEOUT_ERROR; - // } - return RemoteUpdateResult::DONE; - } + RemoteUpdateResult Emplace(const database::StateDelta &delta); /// Creates a new vertex and returns it's gid. gid::Gid CreateVertex( const std::vector &labels, const std::unordered_map - &properties) { - auto result = db_accessor_.InsertVertex(); - for (auto &label : labels) result.add_label(label); - for (auto &kv : properties) result.PropsSet(kv.first, kv.second); - std::lock_guard guard{lock_}; - deltas_.emplace( - result.gid(), - std::make_pair(result, std::vector{})); - return result.gid(); - } + &properties); /// Creates a new edge and returns it's gid. Does not update vertices at the /// end of the edge. gid::Gid CreateEdge(gid::Gid from, storage::VertexAddress to, - storage::EdgeType edge_type) { - auto &db = db_accessor_.db(); - auto edge = db_accessor_.InsertOnlyEdge( - {from, db.WorkerId()}, db.storage().LocalizedAddressIfPossible(to), - edge_type); - std::lock_guard guard{lock_}; - deltas_.emplace( - edge.gid(), - std::make_pair(edge, std::vector{})); - return edge.gid(); - } + storage::EdgeType edge_type); /// Applies all the deltas on the record. - RemoteUpdateResult Apply() { - std::lock_guard guard{lock_}; - for (auto &kv : deltas_) { - auto &record_accessor = kv.second.first; - // We need to reconstruct the record as in the meantime some local - // update might have updated it. - record_accessor.Reconstruct(); - for (database::StateDelta &delta : kv.second.second) { - try { - auto &dba = db_accessor_; - switch (delta.type) { - case database::StateDelta::Type::TRANSACTION_BEGIN: - case database::StateDelta::Type::TRANSACTION_COMMIT: - case database::StateDelta::Type::TRANSACTION_ABORT: - case database::StateDelta::Type::CREATE_VERTEX: - case database::StateDelta::Type::CREATE_EDGE: - case database::StateDelta::Type::BUILD_INDEX: - LOG(FATAL) << "Can only apply record update deltas for remote " - "graph element"; - case database::StateDelta::Type::REMOVE_VERTEX: - if (!db_accessor().RemoveVertex( - reinterpret_cast(record_accessor), - delta.check_empty)) { - return RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR; - } - break; - case database::StateDelta::Type::SET_PROPERTY_VERTEX: - case database::StateDelta::Type::SET_PROPERTY_EDGE: - record_accessor.PropsSet(delta.property, delta.value); - break; - case database::StateDelta::Type::ADD_LABEL: - reinterpret_cast(record_accessor) - .add_label(delta.label); - break; - case database::StateDelta::Type::REMOVE_LABEL: - reinterpret_cast(record_accessor) - .remove_label(delta.label); - break; - case database::StateDelta::Type::ADD_OUT_EDGE: - reinterpret_cast(record_accessor.update()) - .out_.emplace(dba.db().storage().LocalizedAddressIfPossible( - delta.vertex_to_address), - dba.db().storage().LocalizedAddressIfPossible( - delta.edge_address), - delta.edge_type); - dba.wal().Emplace(delta); - break; - case database::StateDelta::Type::ADD_IN_EDGE: - reinterpret_cast(record_accessor.update()) - .in_.emplace(dba.db().storage().LocalizedAddressIfPossible( - delta.vertex_from_address), - dba.db().storage().LocalizedAddressIfPossible( - delta.edge_address), - delta.edge_type); - dba.wal().Emplace(delta); - break; - case database::StateDelta::Type::REMOVE_EDGE: - // We only remove the edge as a result of this StateDelta, - // because the removal of edge from vertex in/out is performed - // in REMOVE_[IN/OUT]_EDGE deltas. - db_accessor_.RemoveEdge( - reinterpret_cast(record_accessor), false, - false); - break; - case database::StateDelta::Type::REMOVE_OUT_EDGE: - reinterpret_cast(record_accessor) - .RemoveOutEdge(delta.edge_address); - break; - case database::StateDelta::Type::REMOVE_IN_EDGE: - reinterpret_cast(record_accessor) - .RemoveInEdge(delta.edge_address); - break; - } - } catch (const mvcc::SerializationError &) { - return RemoteUpdateResult::SERIALIZATION_ERROR; - } catch (const RecordDeletedError &) { - return RemoteUpdateResult::UPDATE_DELETED_ERROR; - } catch (const LockTimeoutException &) { - return RemoteUpdateResult::LOCK_TIMEOUT_ERROR; - } - } - } - return RemoteUpdateResult::DONE; - } + RemoteUpdateResult Apply(); auto &db_accessor() { return db_accessor_; } @@ -223,132 +70,16 @@ class RemoteUpdatesRpcServer { public: RemoteUpdatesRpcServer(database::GraphDb &db, - communication::rpc::Server &server) - : db_(db) { - server.Register([this](const RemoteUpdateReq &req) { - using DeltaType = database::StateDelta::Type; - auto &delta = req.member; - switch (delta.type) { - case DeltaType::SET_PROPERTY_VERTEX: - case DeltaType::ADD_LABEL: - case DeltaType::REMOVE_LABEL: - case database::StateDelta::Type::REMOVE_OUT_EDGE: - case database::StateDelta::Type::REMOVE_IN_EDGE: - return std::make_unique( - GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta)); - case DeltaType::SET_PROPERTY_EDGE: - return std::make_unique( - GetUpdates(edge_updates_, delta.transaction_id).Emplace(delta)); - default: - LOG(FATAL) << "Can't perform a remote update with delta type: " - << static_cast(req.member.type); - } - }); - - server.Register( - [this](const RemoteUpdateApplyReq &req) { - return std::make_unique(Apply(req.member)); - }); - - server.Register( - [this](const RemoteCreateVertexReq &req) { - gid::Gid gid = - GetUpdates(vertex_updates_, req.member.tx_id) - .CreateVertex(req.member.labels, req.member.properties); - return std::make_unique( - RemoteCreateResult{RemoteUpdateResult::DONE, gid}); - }); - - server.Register( - [this](const RemoteCreateEdgeReq &req) { - auto data = req.member; - auto creation_result = CreateEdge(data); - - // If `from` and `to` are both on this worker, we handle it in this - // RPC call. Do it only if CreateEdge succeeded. - if (creation_result.result == RemoteUpdateResult::DONE && - data.to.worker_id() == db_.WorkerId()) { - auto to_delta = database::StateDelta::AddInEdge( - data.tx_id, data.to.gid(), {data.from, db_.WorkerId()}, - {creation_result.gid, db_.WorkerId()}, data.edge_type); - creation_result.result = - GetUpdates(vertex_updates_, data.tx_id).Emplace(to_delta); - } - - return std::make_unique(creation_result); - }); - - server.Register([this](const RemoteAddInEdgeReq &req) { - auto to_delta = database::StateDelta::AddInEdge( - req.member.tx_id, req.member.to, req.member.from, - req.member.edge_address, req.member.edge_type); - auto result = - GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); - return std::make_unique(result); - }); - - server.Register( - [this](const RemoteRemoveVertexReq &req) { - auto to_delta = database::StateDelta::RemoveVertex( - req.member.tx_id, req.member.gid, req.member.check_empty); - auto result = - GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); - return std::make_unique(result); - }); - - server.Register( - [this](const RemoteRemoveEdgeReq &req) { - return std::make_unique(RemoveEdge(req.member)); - }); - - server.Register( - [this](const RemoteRemoveInEdgeReq &req) { - auto data = req.member; - return std::make_unique( - GetUpdates(vertex_updates_, data.tx_id) - .Emplace(database::StateDelta::RemoveInEdge( - data.tx_id, data.vertex, data.edge_address))); - }); - } + communication::rpc::Server &server); /// Applies all existsing updates for the given transaction ID. If there are /// no updates for that transaction, nothing happens. Clears the updates cache /// after applying them, regardless of the result. - RemoteUpdateResult Apply(tx::transaction_id_t tx_id) { - auto apply = [tx_id](auto &collection) { - auto access = collection.access(); - auto found = access.find(tx_id); - if (found == access.end()) { - return RemoteUpdateResult::DONE; - } - auto result = found->second.Apply(); - access.remove(tx_id); - return result; - }; - - auto vertex_result = apply(vertex_updates_); - auto edge_result = apply(edge_updates_); - if (vertex_result != RemoteUpdateResult::DONE) return vertex_result; - if (edge_result != RemoteUpdateResult::DONE) return edge_result; - return RemoteUpdateResult::DONE; - } + RemoteUpdateResult Apply(tx::transaction_id_t tx_id); /// Clears the cache of local transactions that have expired. The signature of /// this method is dictated by `distributed::CacheCleaner`. - void ClearTransactionalCache(tx::transaction_id_t oldest_active) { - auto vertex_access = vertex_updates_.access(); - for (auto &kv : vertex_access) { - if (kv.first < oldest_active) { - vertex_access.remove(kv.first); - } - } - auto edge_access = edge_updates_.access(); - for (auto &kv : edge_access) { - if (kv.first < oldest_active) { - edge_access.remove(kv.first); - } - } - } + void ClearTransactionalCache(tx::transaction_id_t oldest_active); private: database::GraphDb &db_; @@ -362,62 +93,13 @@ class RemoteUpdatesRpcServer { // Gets/creates the TransactionUpdates for the given transaction. template TransactionUpdates &GetUpdates(MapT &updates, - tx::transaction_id_t tx_id) { - return updates.access() - .emplace(tx_id, std::make_tuple(tx_id), - std::make_tuple(std::ref(db_), tx_id)) - .first->second; - } + tx::transaction_id_t tx_id); - RemoteCreateResult CreateEdge(const RemoteCreateEdgeReqData &req) { - auto gid = GetUpdates(edge_updates_, req.tx_id) - .CreateEdge(req.from, req.to, req.edge_type); + // Performs edge creation for the given request. + RemoteCreateResult CreateEdge(const RemoteCreateEdgeReqData &req); - auto from_delta = database::StateDelta::AddOutEdge( - req.tx_id, req.from, req.to, {gid, db_.WorkerId()}, req.edge_type); - - auto result = GetUpdates(vertex_updates_, req.tx_id).Emplace(from_delta); - return {result, gid}; - } - - RemoteUpdateResult RemoveEdge(const RemoteRemoveEdgeData &data) { - // Edge removal. - auto deletion_delta = - database::StateDelta::RemoveEdge(data.tx_id, data.edge_id); - auto result = GetUpdates(edge_updates_, data.tx_id).Emplace(deletion_delta); - - // Out-edge removal, for sure is local. - if (result == RemoteUpdateResult::DONE) { - auto remove_out_delta = database::StateDelta::RemoveOutEdge( - data.tx_id, data.vertex_from_id, {data.edge_id, db_.WorkerId()}); - result = - GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_out_delta); - } - - // In-edge removal, might not be local. - if (result == RemoteUpdateResult::DONE && - data.vertex_to_address.worker_id() == db_.WorkerId()) { - auto remove_in_delta = database::StateDelta::RemoveInEdge( - data.tx_id, data.vertex_to_address.gid(), - {data.edge_id, db_.WorkerId()}); - result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_in_delta); - } - - return result; - } + // Performs edge removal for the given request. + RemoteUpdateResult RemoveEdge(const RemoteRemoveEdgeData &data); }; -template <> -inline VertexAccessor -RemoteUpdatesRpcServer::TransactionUpdates::FindAccessor( - gid::Gid gid) { - return db_accessor_.FindVertexChecked(gid, false); -} - -template <> -inline EdgeAccessor -RemoteUpdatesRpcServer::TransactionUpdates::FindAccessor( - gid::Gid gid) { - return db_accessor_.FindEdgeChecked(gid, false); -} } // namespace distributed diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 9d8cbbaa4..4b566e489 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -4,6 +4,7 @@ #include "database/state_delta.hpp" #include "distributed/remote_data_manager.hpp" #include "distributed/remote_updates_rpc_clients.hpp" +#include "query/exceptions.hpp" #include "storage/edge.hpp" #include "storage/record_accessor.hpp" #include "storage/vertex.hpp"