diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c9c0eba09..8637e7021 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,14 +19,14 @@ set(memgraph_src_files distributed/index_rpc_server.cpp distributed/plan_consumer.cpp distributed/plan_dispatcher.cpp - distributed/remote_cache.cpp - distributed/remote_data_manager.cpp - distributed/remote_data_rpc_clients.cpp - distributed/remote_data_rpc_server.cpp - distributed/remote_produce_rpc_server.cpp - distributed/remote_pull_rpc_clients.cpp - distributed/remote_updates_rpc_clients.cpp - distributed/remote_updates_rpc_server.cpp + distributed/cache.cpp + distributed/data_manager.cpp + distributed/data_rpc_clients.cpp + distributed/data_rpc_server.cpp + distributed/produce_rpc_server.cpp + distributed/pull_rpc_clients.cpp + distributed/updates_rpc_clients.cpp + distributed/updates_rpc_server.cpp durability/paths.cpp durability/recovery.cpp durability/snapshooter.cpp diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index 5b1e881e6..a181b8ebd 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -6,11 +6,11 @@ #include "database/state_delta.hpp" #include "distributed/coordination_rpc_messages.hpp" +#include "distributed/data_rpc_messages.hpp" #include "distributed/index_rpc_messages.hpp" #include "distributed/plan_rpc_messages.hpp" -#include "distributed/remote_data_rpc_messages.hpp" -#include "distributed/remote_pull_produce_rpc_messages.hpp" -#include "distributed/remote_updates_rpc_messages.hpp" +#include "distributed/pull_produce_rpc_messages.hpp" +#include "distributed/updates_rpc_messages.hpp" #include "stats/stats_rpc_messages.hpp" #include "storage/concurrent_id_mapper_rpc_messages.hpp" #include "transactions/engine_rpc_messages.hpp" @@ -59,10 +59,10 @@ BOOST_CLASS_EXPORT(distributed::StopWorkerReq); BOOST_CLASS_EXPORT(distributed::StopWorkerRes); // Distributed data exchange. -BOOST_CLASS_EXPORT(distributed::RemoteEdgeReq); -BOOST_CLASS_EXPORT(distributed::RemoteEdgeRes); -BOOST_CLASS_EXPORT(distributed::RemoteVertexReq); -BOOST_CLASS_EXPORT(distributed::RemoteVertexRes); +BOOST_CLASS_EXPORT(distributed::EdgeReq); +BOOST_CLASS_EXPORT(distributed::EdgeRes); +BOOST_CLASS_EXPORT(distributed::VertexReq); +BOOST_CLASS_EXPORT(distributed::VertexRes); BOOST_CLASS_EXPORT(distributed::TxGidPair); // Distributed plan exchange. @@ -71,9 +71,9 @@ BOOST_CLASS_EXPORT(distributed::DispatchPlanRes); BOOST_CLASS_EXPORT(distributed::RemovePlanReq); BOOST_CLASS_EXPORT(distributed::RemovePlanRes); -// Remote pull. -BOOST_CLASS_EXPORT(distributed::RemotePullReq); -BOOST_CLASS_EXPORT(distributed::RemotePullRes); +// Pull. +BOOST_CLASS_EXPORT(distributed::PullReq); +BOOST_CLASS_EXPORT(distributed::PullRes); BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedReq); BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedRes); @@ -88,30 +88,30 @@ BOOST_CLASS_EXPORT(stats::StatsRes); BOOST_CLASS_EXPORT(stats::BatchStatsReq); BOOST_CLASS_EXPORT(stats::BatchStatsRes); -// Remote updates. +// Updates. BOOST_CLASS_EXPORT(database::StateDelta); -BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq); -BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes); -BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyReq); -BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyRes); +BOOST_CLASS_EXPORT(distributed::UpdateReq); +BOOST_CLASS_EXPORT(distributed::UpdateRes); +BOOST_CLASS_EXPORT(distributed::UpdateApplyReq); +BOOST_CLASS_EXPORT(distributed::UpdateApplyRes); -// Remote creates. -BOOST_CLASS_EXPORT(distributed::RemoteCreateResult); -BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexReq); -BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexReqData); -BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexRes); -BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeReqData); -BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeReq); -BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeRes); -BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReqData); -BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReq); -BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeRes); +// Creates. +BOOST_CLASS_EXPORT(distributed::CreateResult); +BOOST_CLASS_EXPORT(distributed::CreateVertexReq); +BOOST_CLASS_EXPORT(distributed::CreateVertexReqData); +BOOST_CLASS_EXPORT(distributed::CreateVertexRes); +BOOST_CLASS_EXPORT(distributed::CreateEdgeReqData); +BOOST_CLASS_EXPORT(distributed::CreateEdgeReq); +BOOST_CLASS_EXPORT(distributed::CreateEdgeRes); +BOOST_CLASS_EXPORT(distributed::AddInEdgeReqData); +BOOST_CLASS_EXPORT(distributed::AddInEdgeReq); +BOOST_CLASS_EXPORT(distributed::AddInEdgeRes); -// Remote removal. -BOOST_CLASS_EXPORT(distributed::RemoteRemoveVertexReq); -BOOST_CLASS_EXPORT(distributed::RemoteRemoveVertexRes); -BOOST_CLASS_EXPORT(distributed::RemoteRemoveEdgeReq); -BOOST_CLASS_EXPORT(distributed::RemoteRemoveEdgeRes); -BOOST_CLASS_EXPORT(distributed::RemoteRemoveInEdgeData); -BOOST_CLASS_EXPORT(distributed::RemoteRemoveInEdgeReq); -BOOST_CLASS_EXPORT(distributed::RemoteRemoveInEdgeRes); +// Removes. +BOOST_CLASS_EXPORT(distributed::RemoveVertexReq); +BOOST_CLASS_EXPORT(distributed::RemoveVertexRes); +BOOST_CLASS_EXPORT(distributed::RemoveEdgeReq); +BOOST_CLASS_EXPORT(distributed::RemoveEdgeRes); +BOOST_CLASS_EXPORT(distributed::RemoveInEdgeData); +BOOST_CLASS_EXPORT(distributed::RemoveInEdgeReq); +BOOST_CLASS_EXPORT(distributed::RemoveInEdgeRes); diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 915aba193..27410b53f 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -4,17 +4,17 @@ #include "database/graph_db.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" +#include "distributed/data_manager.hpp" +#include "distributed/data_rpc_clients.hpp" +#include "distributed/data_rpc_server.hpp" #include "distributed/index_rpc_server.hpp" #include "distributed/plan_consumer.hpp" #include "distributed/plan_dispatcher.hpp" -#include "distributed/remote_data_manager.hpp" -#include "distributed/remote_data_rpc_clients.hpp" -#include "distributed/remote_data_rpc_server.hpp" -#include "distributed/remote_produce_rpc_server.hpp" -#include "distributed/remote_pull_rpc_clients.hpp" -#include "distributed/remote_updates_rpc_clients.hpp" -#include "distributed/remote_updates_rpc_server.hpp" +#include "distributed/produce_rpc_server.hpp" +#include "distributed/pull_rpc_clients.hpp" #include "distributed/transactional_cache_cleaner.hpp" +#include "distributed/updates_rpc_clients.hpp" +#include "distributed/updates_rpc_server.hpp" #include "durability/paths.hpp" #include "durability/recovery.hpp" #include "durability/snapshooter.hpp" @@ -42,10 +42,10 @@ class PrivateBase : public GraphDb { durability::WriteAheadLog &wal() override { return wal_; } int WorkerId() const override { return config_.worker_id; } - distributed::RemotePullRpcClients &remote_pull_clients() override { + distributed::PullRpcClients &pull_clients() override { LOG(FATAL) << "Remote pull clients only available in master."; } - distributed::RemoteProduceRpcServer &remote_produce_server() override { + distributed::ProduceRpcServer &produce_server() override { LOG(FATAL) << "Remote produce server only available in worker."; } distributed::PlanConsumer &plan_consumer() override { @@ -101,10 +101,10 @@ class SingleNode : public PrivateBase { TypemapPack typemap_pack_; database::SingleNodeCounters counters_; std::vector GetWorkerIds() const override { return {0}; } - distributed::RemoteDataRpcServer &remote_data_server() override { + distributed::DataRpcServer &data_server() override { LOG(FATAL) << "Remote data server not available in single-node."; } - distributed::RemoteDataRpcClients &remote_data_clients() override { + distributed::DataRpcClients &data_clients() override { LOG(FATAL) << "Remote data clients not available in single-node."; } distributed::PlanDispatcher &plan_dispatcher() override { @@ -113,42 +113,38 @@ class SingleNode : public PrivateBase { distributed::PlanConsumer &plan_consumer() override { LOG(FATAL) << "Plan Consumer not available in single-node."; } - distributed::RemoteUpdatesRpcServer &remote_updates_server() override { + distributed::UpdatesRpcServer &updates_server() override { LOG(FATAL) << "Remote updates server not available in single-node."; } - distributed::RemoteUpdatesRpcClients &remote_updates_clients() override { + distributed::UpdatesRpcClients &updates_clients() override { LOG(FATAL) << "Remote updates clients not available in single-node."; } - distributed::RemoteDataManager &remote_data_manager() override { + distributed::DataManager &data_manager() override { LOG(FATAL) << "Remote data manager not available in single-node."; } }; -#define IMPL_DISTRIBUTED_GETTERS \ - std::vector GetWorkerIds() const override { \ - return coordination_.GetWorkerIds(); \ - } \ - distributed::RemoteDataRpcServer &remote_data_server() override { \ - return remote_data_server_; \ - } \ - distributed::RemoteDataRpcClients &remote_data_clients() override { \ - return remote_data_clients_; \ - } \ - distributed::RemoteUpdatesRpcServer &remote_updates_server() override { \ - return remote_updates_server_; \ - } \ - distributed::RemoteUpdatesRpcClients &remote_updates_clients() override { \ - return remote_updates_clients_; \ - } \ - distributed::RemoteDataManager &remote_data_manager() override { \ - return remote_data_manager_; \ - } +#define IMPL_DISTRIBUTED_GETTERS \ + std::vector GetWorkerIds() const override { \ + return coordination_.GetWorkerIds(); \ + } \ + distributed::DataRpcServer &data_server() override { return data_server_; } \ + distributed::DataRpcClients &data_clients() override { \ + return data_clients_; \ + } \ + distributed::UpdatesRpcServer &updates_server() override { \ + return updates_server_; \ + } \ + distributed::UpdatesRpcClients &updates_clients() override { \ + return updates_clients_; \ + } \ + distributed::DataManager &data_manager() override { return data_manager_; } class Master : public PrivateBase { public: explicit Master(const Config &config) : PrivateBase(config) { - cache_cleaner_.Register(remote_updates_server_); - cache_cleaner_.Register(remote_data_manager_); + cache_cleaner_.Register(updates_server_); + cache_cleaner_.Register(data_manager_); } GraphDb::Type type() const override { @@ -159,9 +155,7 @@ class Master : public PrivateBase { distributed::PlanDispatcher &plan_dispatcher() override { return plan_dispatcher_; } - distributed::RemotePullRpcClients &remote_pull_clients() override { - return remote_pull_clients_; - } + distributed::PullRpcClients &pull_clients() override { return pull_clients_; } distributed::IndexRpcClients &index_rpc_clients() override { return index_rpc_clients_; } @@ -181,16 +175,14 @@ class Master : public PrivateBase { distributed::RpcWorkerClients rpc_worker_clients_{coordination_}; TypemapPack typemap_pack_{server_}; database::MasterCounters counters_{server_}; - distributed::RemoteDataRpcServer remote_data_server_{*this, server_}; - distributed::RemoteDataRpcClients remote_data_clients_{rpc_worker_clients_}; + distributed::DataRpcServer data_server_{*this, server_}; + distributed::DataRpcClients data_clients_{rpc_worker_clients_}; distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_}; - distributed::RemotePullRpcClients remote_pull_clients_{rpc_worker_clients_}; + distributed::PullRpcClients pull_clients_{rpc_worker_clients_}; distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_}; - distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_}; - distributed::RemoteUpdatesRpcClients remote_updates_clients_{ - rpc_worker_clients_}; - distributed::RemoteDataManager remote_data_manager_{storage_, - remote_data_clients_}; + distributed::UpdatesRpcServer updates_server_{*this, server_}; + distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_}; + distributed::DataManager data_manager_{storage_, data_clients_}; distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_}; }; @@ -199,9 +191,9 @@ class Worker : public PrivateBase { explicit Worker(const Config &config) : PrivateBase(config) { coordination_.RegisterWorker(config.worker_id); cache_cleaner_.Register(tx_engine_); - cache_cleaner_.Register(remote_produce_server_); - cache_cleaner_.Register(remote_updates_server_); - cache_cleaner_.Register(remote_data_manager_); + cache_cleaner_.Register(produce_server_); + cache_cleaner_.Register(updates_server_); + cache_cleaner_.Register(data_manager_); } GraphDb::Type type() const override { @@ -210,8 +202,8 @@ class Worker : public PrivateBase { IMPL_GETTERS IMPL_DISTRIBUTED_GETTERS distributed::PlanConsumer &plan_consumer() override { return plan_consumer_; } - distributed::RemoteProduceRpcServer &remote_produce_server() override { - return remote_produce_server_; + distributed::ProduceRpcServer &produce_server() override { + return produce_server_; } ~Worker() { @@ -231,17 +223,15 @@ class Worker : public PrivateBase { TypemapPack typemap_pack_{ rpc_worker_clients_.GetClientPool(0)}; database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)}; - distributed::RemoteDataRpcServer remote_data_server_{*this, server_}; - distributed::RemoteDataRpcClients remote_data_clients_{rpc_worker_clients_}; + distributed::DataRpcServer data_server_{*this, server_}; + distributed::DataRpcClients data_clients_{rpc_worker_clients_}; distributed::PlanConsumer plan_consumer_{server_}; - distributed::RemoteProduceRpcServer remote_produce_server_{ - *this, tx_engine_, server_, plan_consumer_}; + distributed::ProduceRpcServer produce_server_{*this, tx_engine_, server_, + plan_consumer_}; distributed::IndexRpcServer index_rpc_server_{*this, server_}; - distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_}; - distributed::RemoteUpdatesRpcClients remote_updates_clients_{ - rpc_worker_clients_}; - distributed::RemoteDataManager remote_data_manager_{storage_, - remote_data_clients_}; + distributed::UpdatesRpcServer updates_server_{*this, server_}; + distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_}; + distributed::DataManager data_manager_{storage_, data_clients_}; distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_}; }; @@ -311,11 +301,11 @@ int PublicBase::WorkerId() const { return impl_->WorkerId(); } std::vector PublicBase::GetWorkerIds() const { return impl_->GetWorkerIds(); } -distributed::RemoteDataRpcServer &PublicBase::remote_data_server() { - return impl_->remote_data_server(); +distributed::DataRpcServer &PublicBase::data_server() { + return impl_->data_server(); } -distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() { - return impl_->remote_data_clients(); +distributed::DataRpcClients &PublicBase::data_clients() { + return impl_->data_clients(); } distributed::PlanDispatcher &PublicBase::plan_dispatcher() { return impl_->plan_dispatcher(); @@ -326,20 +316,20 @@ distributed::IndexRpcClients &PublicBase::index_rpc_clients() { distributed::PlanConsumer &PublicBase::plan_consumer() { return impl_->plan_consumer(); } -distributed::RemotePullRpcClients &PublicBase::remote_pull_clients() { - return impl_->remote_pull_clients(); +distributed::PullRpcClients &PublicBase::pull_clients() { + return impl_->pull_clients(); } -distributed::RemoteProduceRpcServer &PublicBase::remote_produce_server() { - return impl_->remote_produce_server(); +distributed::ProduceRpcServer &PublicBase::produce_server() { + return impl_->produce_server(); } -distributed::RemoteUpdatesRpcServer &PublicBase::remote_updates_server() { - return impl_->remote_updates_server(); +distributed::UpdatesRpcServer &PublicBase::updates_server() { + return impl_->updates_server(); } -distributed::RemoteUpdatesRpcClients &PublicBase::remote_updates_clients() { - return impl_->remote_updates_clients(); +distributed::UpdatesRpcClients &PublicBase::updates_clients() { + return impl_->updates_clients(); } -distributed::RemoteDataManager &PublicBase::remote_data_manager() { - return impl_->remote_data_manager(); +distributed::DataManager &PublicBase::data_manager() { + return impl_->data_manager(); } void PublicBase::MakeSnapshot() { diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 08ea9b26f..4db0756f9 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -15,15 +15,15 @@ #include "utils/scheduler.hpp" namespace distributed { -class RemoteDataRpcServer; -class RemoteDataRpcClients; +class DataRpcServer; +class DataRpcClients; class PlanDispatcher; class PlanConsumer; -class RemotePullRpcClients; -class RemoteProduceRpcServer; -class RemoteUpdatesRpcServer; -class RemoteUpdatesRpcClients; -class RemoteDataManager; +class PullRpcClients; +class ProduceRpcServer; +class UpdatesRpcServer; +class UpdatesRpcClients; +class DataManager; class IndexRpcClients; } @@ -95,20 +95,20 @@ class GraphDb { virtual std::vector GetWorkerIds() const = 0; // Supported only in distributed master and worker, not in single-node. - virtual distributed::RemoteDataRpcServer &remote_data_server() = 0; - virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0; - virtual distributed::RemoteUpdatesRpcServer &remote_updates_server() = 0; - virtual distributed::RemoteUpdatesRpcClients &remote_updates_clients() = 0; - virtual distributed::RemoteDataManager &remote_data_manager() = 0; + virtual distributed::DataRpcServer &data_server() = 0; + virtual distributed::DataRpcClients &data_clients() = 0; + virtual distributed::UpdatesRpcServer &updates_server() = 0; + virtual distributed::UpdatesRpcClients &updates_clients() = 0; + virtual distributed::DataManager &data_manager() = 0; // Supported only in distributed master. - virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0; + virtual distributed::PullRpcClients &pull_clients() = 0; virtual distributed::PlanDispatcher &plan_dispatcher() = 0; virtual distributed::IndexRpcClients &index_rpc_clients() = 0; // Supported only in distributed worker. // TODO remove once end2end testing is possible. - virtual distributed::RemoteProduceRpcServer &remote_produce_server() = 0; + virtual distributed::ProduceRpcServer &produce_server() = 0; virtual distributed::PlanConsumer &plan_consumer() = 0; GraphDb(const GraphDb &) = delete; @@ -138,16 +138,16 @@ class PublicBase : public GraphDb { void CollectGarbage() override; int WorkerId() const override; std::vector GetWorkerIds() const override; - distributed::RemoteDataRpcServer &remote_data_server() override; - distributed::RemoteDataRpcClients &remote_data_clients() override; + distributed::DataRpcServer &data_server() override; + distributed::DataRpcClients &data_clients() override; distributed::PlanDispatcher &plan_dispatcher() override; distributed::IndexRpcClients &index_rpc_clients() override; distributed::PlanConsumer &plan_consumer() override; - distributed::RemotePullRpcClients &remote_pull_clients() override; - distributed::RemoteProduceRpcServer &remote_produce_server() override; - distributed::RemoteUpdatesRpcServer &remote_updates_server() override; - distributed::RemoteUpdatesRpcClients &remote_updates_clients() override; - distributed::RemoteDataManager &remote_data_manager() override; + distributed::PullRpcClients &pull_clients() override; + distributed::ProduceRpcServer &produce_server() override; + distributed::UpdatesRpcServer &updates_server() override; + distributed::UpdatesRpcClients &updates_clients() override; + distributed::DataManager &data_manager() override; bool is_accepting_transactions() const { return is_accepting_transactions_; } diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index 1e8458351..20ae56d9c 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -5,9 +5,9 @@ #include "database/graph_db_accessor.hpp" #include "database/state_delta.hpp" -#include "distributed/remote_data_manager.hpp" -#include "distributed/remote_updates_rpc_clients.hpp" +#include "distributed/data_manager.hpp" #include "distributed/rpc_worker_clients.hpp" +#include "distributed/updates_rpc_clients.hpp" #include "storage/address_types.hpp" #include "storage/edge.hpp" #include "storage/edge_accessor.hpp" @@ -85,14 +85,14 @@ VertexAccessor GraphDbAccessor::InsertVertexIntoRemote( CHECK(worker_id != db().WorkerId()) << "Not allowed to call InsertVertexIntoRemote for local worker"; - gid::Gid gid = db().remote_updates_clients().RemoteCreateVertex( + gid::Gid gid = db().updates_clients().CreateVertex( worker_id, transaction_id(), labels, properties); auto vertex = std::make_unique(); vertex->labels_ = labels; for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second); - db().remote_data_manager() + db().data_manager() .Elements(transaction_id()) .emplace(gid, nullptr, std::move(vertex)); return VertexAccessor({gid, worker_id}, *this); @@ -340,8 +340,8 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor, if (!vertex_accessor.is_local()) { auto address = vertex_accessor.address(); - db().remote_updates_clients().RemoteRemoveVertex( - address.worker_id(), transaction_id(), address.gid(), check_empty); + db().updates_clients().RemoveVertex(address.worker_id(), transaction_id(), + address.gid(), check_empty); // We can't know if we are going to be able to remove vertex until deferred // updates on a remote worker are executed return true; @@ -411,15 +411,15 @@ EdgeAccessor GraphDbAccessor::InsertEdge( EdgeTypeName(edge_type))); } else { - edge_address = db().remote_updates_clients().RemoteCreateEdge( - transaction_id(), from, to, edge_type); + edge_address = db().updates_clients().CreateEdge(transaction_id(), from, to, + edge_type); - from_updated = db().remote_data_manager() + from_updated = db().data_manager() .Elements(transaction_id()) .FindNew(from.gid()); - // Create an Edge and insert it into the RemoteCache so we see it locally. - db().remote_data_manager() + // Create an Edge and insert it into the Cache so we see it locally. + db().data_manager() .Elements(transaction_id()) .emplace( edge_address.gid(), nullptr, @@ -440,11 +440,11 @@ EdgeAccessor GraphDbAccessor::InsertEdge( // The RPC call for the `to` side is already handled if `from` is not local. if (from.is_local() || from.address().worker_id() != to.address().worker_id()) { - db().remote_updates_clients().RemoteAddInEdge( + db().updates_clients().AddInEdge( transaction_id(), from, db().storage().GlobalizedAddress(edge_address), to, edge_type); } - to_updated = db().remote_data_manager() + to_updated = db().data_manager() .Elements(transaction_id()) .FindNew(to.gid()); } @@ -498,15 +498,15 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge, CHECK(edge_addr.worker_id() == from_addr.worker_id()) << "Edge and it's 'from' vertex not on the same worker"; auto to_addr = db().storage().GlobalizedAddress(edge.to_addr()); - db().remote_updates_clients().RemoteRemoveEdge( - transaction_id(), edge_addr.worker_id(), edge_addr.gid(), - from_addr.gid(), to_addr); + db().updates_clients().RemoveEdge(transaction_id(), edge_addr.worker_id(), + edge_addr.gid(), from_addr.gid(), + to_addr); // Another RPC is necessary only if the first did not handle vertices on // both sides. if (edge_addr.worker_id() != to_addr.worker_id()) { - db().remote_updates_clients().RemoteRemoveInEdge( - transaction_id(), to_addr.worker_id(), to_addr.gid(), edge_addr); + db().updates_clients().RemoveInEdge(transaction_id(), to_addr.worker_id(), + to_addr.gid(), edge_addr); } } } diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index bcd55b4fb..627c973db 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -9,7 +9,7 @@ #include "glog/logging.h" #include "database/graph_db.hpp" -#include "distributed/remote_cache.hpp" +#include "distributed/cache.hpp" #include "query/typed_value.hpp" #include "storage/address_types.hpp" #include "storage/edge_accessor.hpp" diff --git a/src/distributed/remote_cache.cpp b/src/distributed/cache.cpp similarity index 75% rename from src/distributed/remote_cache.cpp rename to src/distributed/cache.cpp index 37eb80404..a8099cd69 100644 --- a/src/distributed/remote_cache.cpp +++ b/src/distributed/cache.cpp @@ -2,14 +2,14 @@ #include "glog/logging.h" #include "database/storage.hpp" -#include "distributed/remote_cache.hpp" +#include "distributed/cache.hpp" #include "storage/edge.hpp" #include "storage/vertex.hpp" namespace distributed { template -TRecord *RemoteCache::FindNew(gid::Gid gid) { +TRecord *Cache::FindNew(gid::Gid gid) { std::lock_guard guard{lock_}; auto found = cache_.find(gid); DCHECK(found != cache_.end()) @@ -22,10 +22,9 @@ TRecord *RemoteCache::FindNew(gid::Gid gid) { } template -void RemoteCache::FindSetOldNew(tx::transaction_id_t tx_id, - int worker_id, gid::Gid gid, - TRecord *&old_record, - TRecord *&new_record) { +void Cache::FindSetOldNew(tx::transaction_id_t tx_id, int worker_id, + gid::Gid gid, TRecord *&old_record, + TRecord *&new_record) { { std::lock_guard guard(lock_); auto found = cache_.find(gid); @@ -36,8 +35,7 @@ void RemoteCache::FindSetOldNew(tx::transaction_id_t tx_id, } } - auto remote = - remote_data_clients_.RemoteElement(worker_id, tx_id, gid); + auto remote = data_clients_.RemoteElement(worker_id, tx_id, gid); LocalizeAddresses(*remote); // This logic is a bit strange because we need to make sure that someone @@ -54,8 +52,8 @@ void RemoteCache::FindSetOldNew(tx::transaction_id_t tx_id, } template -void RemoteCache::emplace(gid::Gid gid, rec_uptr old_record, - rec_uptr new_record) { +void Cache::emplace(gid::Gid gid, rec_uptr old_record, + rec_uptr new_record) { if (old_record) LocalizeAddresses(*old_record); if (new_record) LocalizeAddresses(*new_record); @@ -71,13 +69,13 @@ void RemoteCache::emplace(gid::Gid gid, rec_uptr old_record, } template -void RemoteCache::ClearCache() { +void Cache::ClearCache() { std::lock_guard guard{lock_}; cache_.clear(); } template <> -void RemoteCache::LocalizeAddresses(Vertex &vertex) { +void Cache::LocalizeAddresses(Vertex &vertex) { auto localize_edges = [this](auto &edges) { for (auto &element : edges) { element.vertex = storage_.LocalizedAddressIfPossible(element.vertex); @@ -90,12 +88,12 @@ void RemoteCache::LocalizeAddresses(Vertex &vertex) { } template <> -void RemoteCache::LocalizeAddresses(Edge &edge) { +void Cache::LocalizeAddresses(Edge &edge) { edge.from_ = storage_.LocalizedAddressIfPossible(edge.from_); edge.to_ = storage_.LocalizedAddressIfPossible(edge.to_); } -template class RemoteCache; -template class RemoteCache; +template class Cache; +template class Cache; } // namespace distributed diff --git a/src/distributed/remote_cache.hpp b/src/distributed/cache.hpp similarity index 80% rename from src/distributed/remote_cache.hpp rename to src/distributed/cache.hpp index 1278838a7..b62af0e7d 100644 --- a/src/distributed/remote_cache.hpp +++ b/src/distributed/cache.hpp @@ -3,7 +3,7 @@ #include #include -#include "distributed/remote_data_rpc_clients.hpp" +#include "distributed/data_rpc_clients.hpp" #include "storage/gid.hpp" namespace database { @@ -16,19 +16,18 @@ namespace distributed { * Used for caching Vertices and Edges that are stored on another worker in a * distributed system. Maps global IDs to (old, new) Vertex/Edge pointer * pairs. It is possible that either "old" or "new" are nullptrs, but at - * least one must be not-null. The RemoteCache is the owner of TRecord + * least one must be not-null. The Cache is the owner of TRecord * objects it points to. * * @tparam TRecord - Edge or Vertex */ template -class RemoteCache { +class Cache { using rec_uptr = std::unique_ptr; public: - RemoteCache(database::Storage &storage, - distributed::RemoteDataRpcClients &remote_data_clients) - : storage_(storage), remote_data_clients_(remote_data_clients) {} + Cache(database::Storage &storage, distributed::DataRpcClients &data_clients) + : storage_(storage), data_clients_(data_clients) {} /// Returns the new data for the given ID. Creates it (as copy of old) if /// necessary. @@ -51,7 +50,7 @@ class RemoteCache { database::Storage &storage_; std::mutex lock_; - distributed::RemoteDataRpcClients &remote_data_clients_; + distributed::DataRpcClients &data_clients_; // TODO it'd be better if we had VertexData and EdgeData in here, as opposed // to Vertex and Edge. std::unordered_map> cache_; diff --git a/src/distributed/data_manager.cpp b/src/distributed/data_manager.cpp new file mode 100644 index 000000000..6e1ced718 --- /dev/null +++ b/src/distributed/data_manager.cpp @@ -0,0 +1,53 @@ +#include "distributed/data_manager.hpp" +#include "database/storage.hpp" + +namespace distributed { + +template +Cache &DataManager::GetCache(CacheT &collection, + tx::transaction_id_t tx_id) { + auto access = collection.access(); + auto found = access.find(tx_id); + if (found != access.end()) return found->second; + + return access + .emplace(tx_id, std::make_tuple(tx_id), + std::make_tuple(std::ref(storage_), std::ref(data_clients_))) + .first->second; +} + +template <> +Cache &DataManager::Elements(tx::transaction_id_t tx_id) { + return GetCache(vertices_caches_, tx_id); +} + +template <> +Cache &DataManager::Elements(tx::transaction_id_t tx_id) { + return GetCache(edges_caches_, tx_id); +} + +DataManager::DataManager(database::Storage &storage, + distributed::DataRpcClients &data_clients) + : storage_(storage), data_clients_(data_clients) {} + +void DataManager::ClearCacheForSingleTransaction(tx::transaction_id_t tx_id) { + Elements(tx_id).ClearCache(); + Elements(tx_id).ClearCache(); +} + +void DataManager::ClearTransactionalCache(tx::transaction_id_t oldest_active) { + auto vertex_access = vertices_caches_.access(); + for (auto &kv : vertex_access) { + if (kv.first < oldest_active) { + vertex_access.remove(kv.first); + } + } + auto edge_access = edges_caches_.access(); + for (auto &kv : edge_access) { + if (kv.first < oldest_active) { + edge_access.remove(kv.first); + } + } +} + +} // namespace distributed diff --git a/src/distributed/remote_data_manager.hpp b/src/distributed/data_manager.hpp similarity index 64% rename from src/distributed/remote_data_manager.hpp rename to src/distributed/data_manager.hpp index 6eb1b1518..9a8b761c2 100644 --- a/src/distributed/remote_data_manager.hpp +++ b/src/distributed/data_manager.hpp @@ -1,8 +1,8 @@ #pragma once #include "data_structures/concurrent/concurrent_map.hpp" -#include "distributed/remote_cache.hpp" -#include "distributed/remote_data_rpc_clients.hpp" +#include "distributed/cache.hpp" +#include "distributed/data_rpc_clients.hpp" #include "transactions/type.hpp" class Vertex; @@ -15,22 +15,22 @@ class Storage; namespace distributed { /// Handles remote data caches for edges and vertices, per transaction. -class RemoteDataManager { +class DataManager { template - using CacheT = ConcurrentMap>; + using CacheT = ConcurrentMap>; // Helper, gets or inserts a data cache for the given transaction. template - RemoteCache &GetCache(CacheT &collection, - tx::transaction_id_t tx_id); + Cache &GetCache(CacheT &collection, + tx::transaction_id_t tx_id); public: - RemoteDataManager(database::Storage &storage, - distributed::RemoteDataRpcClients &remote_data_clients); + DataManager(database::Storage &storage, + distributed::DataRpcClients &data_clients); /// Gets or creates the remote vertex/edge cache for the given transaction. template - RemoteCache &Elements(tx::transaction_id_t tx_id); + Cache &Elements(tx::transaction_id_t tx_id); /// Removes all the caches for a single transaction. void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id); @@ -41,7 +41,7 @@ class RemoteDataManager { private: database::Storage &storage_; - RemoteDataRpcClients &remote_data_clients_; + DataRpcClients &data_clients_; CacheT vertices_caches_; CacheT edges_caches_; }; diff --git a/src/distributed/data_rpc_clients.cpp b/src/distributed/data_rpc_clients.cpp new file mode 100644 index 000000000..9e9f3602e --- /dev/null +++ b/src/distributed/data_rpc_clients.cpp @@ -0,0 +1,27 @@ +#include "distributed/data_rpc_clients.hpp" +#include "distributed/data_rpc_messages.hpp" +#include "storage/edge.hpp" +#include "storage/vertex.hpp" + +namespace distributed { + +template <> +std::unique_ptr DataRpcClients::RemoteElement(int worker_id, + tx::transaction_id_t tx_id, + gid::Gid gid) { + auto response = + clients_.GetClientPool(worker_id).Call(TxGidPair{tx_id, gid}); + CHECK(response) << "EdgeRpc failed"; + return std::move(response->name_output_); +} + +template <> +std::unique_ptr DataRpcClients::RemoteElement( + int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { + auto response = + clients_.GetClientPool(worker_id).Call(TxGidPair{tx_id, gid}); + CHECK(response) << "VertexRpc failed"; + return std::move(response->name_output_); +} + +} // namespace distributed diff --git a/src/distributed/remote_data_rpc_clients.hpp b/src/distributed/data_rpc_clients.hpp similarity index 87% rename from src/distributed/remote_data_rpc_clients.hpp rename to src/distributed/data_rpc_clients.hpp index 8fe794d95..e4610415d 100644 --- a/src/distributed/remote_data_rpc_clients.hpp +++ b/src/distributed/data_rpc_clients.hpp @@ -10,9 +10,9 @@ namespace distributed { /// Provides access to other worker's data. -class RemoteDataRpcClients { +class DataRpcClients { public: - RemoteDataRpcClients(RpcWorkerClients &clients) : clients_(clients) {} + DataRpcClients(RpcWorkerClients &clients) : clients_(clients) {} /// Returns a remote worker's record (vertex/edge) data for the given params. /// That worker must own the vertex/edge for the given id, and that vertex /// must be visible in given transaction. diff --git a/src/distributed/remote_data_rpc_messages.hpp b/src/distributed/data_rpc_messages.hpp similarity index 83% rename from src/distributed/remote_data_rpc_messages.hpp rename to src/distributed/data_rpc_messages.hpp index a7617e071..7baa97800 100644 --- a/src/distributed/remote_data_rpc_messages.hpp +++ b/src/distributed/data_rpc_messages.hpp @@ -27,10 +27,10 @@ struct TxGidPair { }; #define MAKE_RESPONSE(type, name) \ - class Remote##type##Res : public communication::rpc::Message { \ + class type##Res : public communication::rpc::Message { \ public: \ - Remote##type##Res() {} \ - Remote##type##Res(const type *name, int worker_id) \ + type##Res() {} \ + type##Res(const type *name, int worker_id) \ : name_input_(name), worker_id_(worker_id) {} \ \ template \ @@ -59,12 +59,10 @@ MAKE_RESPONSE(Edge, edge) #undef MAKE_RESPONSE -RPC_SINGLE_MEMBER_MESSAGE(RemoteVertexReq, TxGidPair); -RPC_SINGLE_MEMBER_MESSAGE(RemoteEdgeReq, TxGidPair); +RPC_SINGLE_MEMBER_MESSAGE(VertexReq, TxGidPair); +RPC_SINGLE_MEMBER_MESSAGE(EdgeReq, TxGidPair); -using RemoteVertexRpc = - communication::rpc::RequestResponse; -using RemoteEdgeRpc = - communication::rpc::RequestResponse; +using VertexRpc = communication::rpc::RequestResponse; +using EdgeRpc = communication::rpc::RequestResponse; } // namespace distributed diff --git a/src/distributed/data_rpc_server.cpp b/src/distributed/data_rpc_server.cpp new file mode 100644 index 000000000..90f32f305 --- /dev/null +++ b/src/distributed/data_rpc_server.cpp @@ -0,0 +1,29 @@ +#include + +#include "data_rpc_server.hpp" +#include "database/graph_db_accessor.hpp" +#include "distributed/data_rpc_messages.hpp" + +namespace distributed { + +DataRpcServer::DataRpcServer(database::GraphDb &db, + communication::rpc::Server &server) + : db_(db), rpc_server_(server) { + rpc_server_.Register( + [this](const VertexReq &req) { + database::GraphDbAccessor dba(db_, req.member.tx_id); + auto vertex = dba.FindVertex(req.member.gid, false); + CHECK(vertex.GetOld()) + << "Old record must exist when sending vertex by RPC"; + return std::make_unique(vertex.GetOld(), db_.WorkerId()); + }); + + rpc_server_.Register([this](const EdgeReq &req) { + database::GraphDbAccessor dba(db_, req.member.tx_id); + auto edge = dba.FindEdge(req.member.gid, false); + CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC"; + return std::make_unique(edge.GetOld(), db_.WorkerId()); + }); +} + +} // namespace distributed diff --git a/src/distributed/remote_data_rpc_server.hpp b/src/distributed/data_rpc_server.hpp similarity index 67% rename from src/distributed/remote_data_rpc_server.hpp rename to src/distributed/data_rpc_server.hpp index c1496026b..91612a5cc 100644 --- a/src/distributed/remote_data_rpc_server.hpp +++ b/src/distributed/data_rpc_server.hpp @@ -6,10 +6,9 @@ namespace distributed { /// Serves this worker's data to others. -class RemoteDataRpcServer { +class DataRpcServer { public: - RemoteDataRpcServer(database::GraphDb &db, - communication::rpc::Server &server); + DataRpcServer(database::GraphDb &db, communication::rpc::Server &server); private: database::GraphDb &db_; diff --git a/src/distributed/remote_produce_rpc_server.cpp b/src/distributed/produce_rpc_server.cpp similarity index 63% rename from src/distributed/remote_produce_rpc_server.cpp rename to src/distributed/produce_rpc_server.cpp index d96fa6c3a..24ec97492 100644 --- a/src/distributed/remote_produce_rpc_server.cpp +++ b/src/distributed/produce_rpc_server.cpp @@ -1,13 +1,13 @@ -#include "distributed/remote_produce_rpc_server.hpp" -#include "distributed/remote_data_manager.hpp" -#include "distributed/remote_pull_produce_rpc_messages.hpp" +#include "distributed/produce_rpc_server.hpp" +#include "distributed/data_manager.hpp" +#include "distributed/pull_produce_rpc_messages.hpp" #include "query/common.hpp" #include "query/exceptions.hpp" #include "transactions/engine_worker.hpp" namespace distributed { -RemoteProduceRpcServer::OngoingProduce::OngoingProduce( +ProduceRpcServer::OngoingProduce::OngoingProduce( database::GraphDb &db, tx::transaction_id_t tx_id, std::shared_ptr op, query::SymbolTable symbol_table, Parameters parameters, @@ -21,8 +21,8 @@ RemoteProduceRpcServer::OngoingProduce::OngoingProduce( context_.parameters_ = std::move(parameters); } -std::pair, RemotePullState> -RemoteProduceRpcServer::OngoingProduce::Pull() { +std::pair, PullState> +ProduceRpcServer::OngoingProduce::Pull() { if (!accumulation_.empty()) { auto results = std::move(accumulation_.back()); accumulation_.pop_back(); @@ -30,35 +30,34 @@ RemoteProduceRpcServer::OngoingProduce::Pull() { try { query::ReconstructTypedValue(element); } catch (query::ReconstructionException &) { - cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; + cursor_state_ = PullState::RECONSTRUCTION_ERROR; return std::make_pair(std::move(results), cursor_state_); } } - return std::make_pair(std::move(results), - RemotePullState::CURSOR_IN_PROGRESS); + return std::make_pair(std::move(results), PullState::CURSOR_IN_PROGRESS); } return PullOneFromCursor(); } -RemotePullState RemoteProduceRpcServer::OngoingProduce::Accumulate() { +PullState ProduceRpcServer::OngoingProduce::Accumulate() { while (true) { auto result = PullOneFromCursor(); - if (result.second != RemotePullState::CURSOR_IN_PROGRESS) + if (result.second != PullState::CURSOR_IN_PROGRESS) return result.second; else accumulation_.emplace_back(std::move(result.first)); } } -std::pair, RemotePullState> -RemoteProduceRpcServer::OngoingProduce::PullOneFromCursor() { +std::pair, PullState> +ProduceRpcServer::OngoingProduce::PullOneFromCursor() { std::vector results; // Check if we already exhausted this cursor (or it entered an error // state). This happens when we accumulate before normal pull. - if (cursor_state_ != RemotePullState::CURSOR_IN_PROGRESS) { + if (cursor_state_ != PullState::CURSOR_IN_PROGRESS) { return std::make_pair(results, cursor_state_); } @@ -69,48 +68,47 @@ RemoteProduceRpcServer::OngoingProduce::PullOneFromCursor() { results.emplace_back(std::move(frame_[symbol])); } } else { - cursor_state_ = RemotePullState::CURSOR_EXHAUSTED; + cursor_state_ = PullState::CURSOR_EXHAUSTED; } } catch (const mvcc::SerializationError &) { - cursor_state_ = RemotePullState::SERIALIZATION_ERROR; + cursor_state_ = PullState::SERIALIZATION_ERROR; } catch (const LockTimeoutException &) { - cursor_state_ = RemotePullState::LOCK_TIMEOUT_ERROR; + cursor_state_ = PullState::LOCK_TIMEOUT_ERROR; } catch (const RecordDeletedError &) { - cursor_state_ = RemotePullState::UPDATE_DELETED_ERROR; + cursor_state_ = PullState::UPDATE_DELETED_ERROR; } catch (const query::ReconstructionException &) { - cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; + cursor_state_ = PullState::RECONSTRUCTION_ERROR; } catch (const query::RemoveAttachedVertexException &) { - cursor_state_ = RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR; + cursor_state_ = PullState::UNABLE_TO_DELETE_VERTEX_ERROR; } catch (const query::QueryRuntimeException &) { - cursor_state_ = RemotePullState::QUERY_ERROR; + cursor_state_ = PullState::QUERY_ERROR; } catch (const query::HintedAbortError &) { - cursor_state_ = RemotePullState::HINTED_ABORT_ERROR; + cursor_state_ = PullState::HINTED_ABORT_ERROR; } return std::make_pair(std::move(results), cursor_state_); } -RemoteProduceRpcServer::RemoteProduceRpcServer( +ProduceRpcServer::ProduceRpcServer( database::GraphDb &db, tx::Engine &tx_engine, communication::rpc::Server &server, const distributed::PlanConsumer &plan_consumer) : db_(db), - remote_produce_rpc_server_(server), + produce_rpc_server_(server), plan_consumer_(plan_consumer), tx_engine_(tx_engine) { - remote_produce_rpc_server_.Register( - [this](const RemotePullReq &req) { - return std::make_unique(RemotePull(req)); - }); + produce_rpc_server_.Register([this](const PullReq &req) { + return std::make_unique(Pull(req)); + }); - remote_produce_rpc_server_.Register( + produce_rpc_server_.Register( [this](const TransactionCommandAdvancedReq &req) { tx_engine_.UpdateCommand(req.member); - db_.remote_data_manager().ClearCacheForSingleTransaction(req.member); + db_.data_manager().ClearCacheForSingleTransaction(req.member); return std::make_unique(); }); } -void RemoteProduceRpcServer::ClearTransactionalCache( +void ProduceRpcServer::ClearTransactionalCache( tx::transaction_id_t oldest_active) { auto access = ongoing_produces_.access(); for (auto &kv : access) { @@ -120,8 +118,8 @@ void RemoteProduceRpcServer::ClearTransactionalCache( } } -RemoteProduceRpcServer::OngoingProduce & -RemoteProduceRpcServer::GetOngoingProduce(const RemotePullReq &req) { +ProduceRpcServer::OngoingProduce &ProduceRpcServer::GetOngoingProduce( + const PullReq &req) { auto access = ongoing_produces_.access(); auto key_pair = std::make_pair(req.tx_id, req.plan_id); auto found = access.find(key_pair); @@ -142,17 +140,16 @@ RemoteProduceRpcServer::GetOngoingProduce(const RemotePullReq &req) { .first->second; } -RemotePullResData RemoteProduceRpcServer::RemotePull(const RemotePullReq &req) { +PullResData ProduceRpcServer::Pull(const PullReq &req) { auto &ongoing_produce = GetOngoingProduce(req); - RemotePullResData result{db_.WorkerId(), req.send_old, req.send_new}; - result.state_and_frames.pull_state = RemotePullState::CURSOR_IN_PROGRESS; + PullResData result{db_.WorkerId(), req.send_old, req.send_new}; + result.state_and_frames.pull_state = PullState::CURSOR_IN_PROGRESS; if (req.accumulate) { result.state_and_frames.pull_state = ongoing_produce.Accumulate(); // If an error ocurred, we need to return that error. - if (result.state_and_frames.pull_state != - RemotePullState::CURSOR_EXHAUSTED) { + if (result.state_and_frames.pull_state != PullState::CURSOR_EXHAUSTED) { return result; } } @@ -160,7 +157,7 @@ RemotePullResData RemoteProduceRpcServer::RemotePull(const RemotePullReq &req) { for (int i = 0; i < req.batch_size; ++i) { auto pull_result = ongoing_produce.Pull(); result.state_and_frames.pull_state = pull_result.second; - if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS) break; + if (pull_result.second != PullState::CURSOR_IN_PROGRESS) break; result.state_and_frames.frames.emplace_back(std::move(pull_result.first)); } diff --git a/src/distributed/remote_produce_rpc_server.hpp b/src/distributed/produce_rpc_server.hpp similarity index 80% rename from src/distributed/remote_produce_rpc_server.hpp rename to src/distributed/produce_rpc_server.hpp index 49189e413..3423f7871 100644 --- a/src/distributed/remote_produce_rpc_server.hpp +++ b/src/distributed/produce_rpc_server.hpp @@ -24,7 +24,7 @@ namespace distributed { /// master. Assumes that (tx_id, plan_id) uniquely identifies an execution, and /// that there will never be parallel requests for the same execution thus /// identified. -class RemoteProduceRpcServer { +class ProduceRpcServer { /// Encapsulates a Cursor execution in progress. Can be used for pulling a /// single result from the execution, or pulling all and accumulating the /// results. Accumulations are used for synchronizing updates in distributed @@ -39,11 +39,11 @@ class RemoteProduceRpcServer { /// Returns a vector of typed values (one for each `pull_symbol`), and an /// indication of the pull result. The result data is valid only if the /// returned state is CURSOR_IN_PROGRESS. - std::pair, RemotePullState> Pull(); + std::pair, PullState> Pull(); /// Accumulates all the frames pulled from the cursor and returns /// CURSOR_EXHAUSTED. If an error occurs, an appropriate value is returned. - RemotePullState Accumulate(); + PullState Accumulate(); private: database::GraphDbAccessor dba_; @@ -51,18 +51,17 @@ class RemoteProduceRpcServer { query::Context context_; std::vector pull_symbols_; query::Frame frame_; - RemotePullState cursor_state_{RemotePullState::CURSOR_IN_PROGRESS}; + PullState cursor_state_{PullState::CURSOR_IN_PROGRESS}; std::vector> accumulation_; /// Pulls and returns a single result from the cursor. - std::pair, RemotePullState> - PullOneFromCursor(); + std::pair, PullState> PullOneFromCursor(); }; public: - RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &tx_engine, - communication::rpc::Server &server, - const distributed::PlanConsumer &plan_consumer); + ProduceRpcServer(database::GraphDb &db, tx::Engine &tx_engine, + communication::rpc::Server &server, + const distributed::PlanConsumer &plan_consumer); /// Clears the cache of local transactions that have expired. The signature of /// this method is dictated by `distributed::TransactionalCacheCleaner`. @@ -70,7 +69,7 @@ class RemoteProduceRpcServer { private: database::GraphDb &db_; - communication::rpc::Server &remote_produce_rpc_server_; + communication::rpc::Server &produce_rpc_server_; const distributed::PlanConsumer &plan_consumer_; ConcurrentMap, OngoingProduce> ongoing_produces_; @@ -78,10 +77,10 @@ class RemoteProduceRpcServer { /// Gets an ongoing produce for the given pull request. Creates a new one if /// there is none currently existing. - OngoingProduce &GetOngoingProduce(const RemotePullReq &req); + OngoingProduce &GetOngoingProduce(const PullReq &req); /// Performs a single remote pull for the given request. - RemotePullResData RemotePull(const RemotePullReq &req); + PullResData Pull(const PullReq &req); }; } // namespace distributed diff --git a/src/distributed/remote_pull_produce_rpc_messages.hpp b/src/distributed/pull_produce_rpc_messages.hpp similarity index 89% rename from src/distributed/remote_pull_produce_rpc_messages.hpp rename to src/distributed/pull_produce_rpc_messages.hpp index 9660b2772..4ef6a8e95 100644 --- a/src/distributed/remote_pull_produce_rpc_messages.hpp +++ b/src/distributed/pull_produce_rpc_messages.hpp @@ -23,7 +23,7 @@ constexpr int kDefaultBatchSize = 20; /// Returnd along with a batch of results in the remote-pull RPC. Indicates the /// state of execution on the worker. -enum class RemotePullState { +enum class PullState { CURSOR_EXHAUSTED, CURSOR_IN_PROGRESS, SERIALIZATION_ERROR, @@ -35,12 +35,11 @@ enum class RemotePullState { QUERY_ERROR }; -struct RemotePullReq : public communication::rpc::Message { - RemotePullReq() {} - RemotePullReq(tx::transaction_id_t tx_id, tx::Snapshot tx_snapshot, - int64_t plan_id, const Parameters ¶ms, - std::vector symbols, bool accumulate, - int batch_size, bool send_old, bool send_new) +struct PullReq : public communication::rpc::Message { + PullReq() {} + PullReq(tx::transaction_id_t tx_id, tx::Snapshot tx_snapshot, int64_t plan_id, + const Parameters ¶ms, std::vector symbols, + bool accumulate, int batch_size, bool send_old, bool send_new) : tx_id(tx_id), tx_snapshot(tx_snapshot), plan_id(plan_id), @@ -109,10 +108,10 @@ struct RemotePullReq : public communication::rpc::Message { BOOST_SERIALIZATION_SPLIT_MEMBER() }; -/// The data returned to the end consumer (the RemotePull operator). Contains +/// The data returned to the end consumer (the Pull operator). Contains /// only the relevant parts of the response, ready for use. -struct RemotePullData { - RemotePullState pull_state; +struct PullData { + PullState pull_state; std::vector> frames; }; @@ -121,17 +120,17 @@ struct RemotePullData { /// (possibly encapsulated in lists/maps) to their proper values. This requires /// a GraphDbAccessor and therefore can't be done as part of deserialization. /// -/// TODO - make it possible to inject a &GraphDbAcessor from the RemotePull +/// TODO - make it possible to inject a &GraphDbAcessor from the Pull /// layer /// all the way into RPC data deserialization to remove the requirement for /// post-processing. The current approach of holding references to parts of the /// frame (potentially embedded in lists/maps) is too error-prone. -struct RemotePullResData { +struct PullResData { private: // Temp cache for deserialized vertices and edges. These objects are created // during deserialization. They are used immediatelly after during // post-processing. The vertex/edge data ownership gets transfered to the - // RemoteCache, and the `element_in_frame` reference is used to set the + // Cache, and the `element_in_frame` reference is used to set the // appropriate accessor to the appropriate value. Not used on side that // generates the response. template @@ -164,16 +163,16 @@ struct RemotePullResData { }; public: - RemotePullResData() {} // Default constructor required for serialization. - RemotePullResData(int worker_id, bool send_old, bool send_new) + PullResData() {} // Default constructor required for serialization. + PullResData(int worker_id, bool send_old, bool send_new) : worker_id(worker_id), send_old(send_old), send_new(send_new) {} - RemotePullResData(const RemotePullResData &) = delete; - RemotePullResData &operator=(const RemotePullResData &) = delete; - RemotePullResData(RemotePullResData &&) = default; - RemotePullResData &operator=(RemotePullResData &&) = default; + PullResData(const PullResData &) = delete; + PullResData &operator=(const PullResData &) = delete; + PullResData(PullResData &&) = default; + PullResData &operator=(PullResData &&) = default; - RemotePullData state_and_frames; + PullData state_and_frames; // Id of the worker on which the response is created, used for serializing // vertices (converting local to global addresses). int worker_id; @@ -182,7 +181,7 @@ struct RemotePullResData { bool send_new; // Temporary caches used between deserialization and post-processing - // (transfering the ownership of this data to a RemoteCache). + // (transfering the ownership of this data to a Cache). std::vector> vertices; std::vector> edges; std::vector paths; @@ -306,12 +305,12 @@ struct RemotePullResData { } }; -class RemotePullRes : public communication::rpc::Message { +class PullRes : public communication::rpc::Message { public: - RemotePullRes() {} - RemotePullRes(RemotePullResData data) : data(std::move(data)) {} + PullRes() {} + PullRes(PullResData data) : data(std::move(data)) {} - RemotePullResData data; + PullResData data; private: friend class boost::serialization::access; @@ -362,11 +361,10 @@ class RemotePullRes : public communication::rpc::Message { BOOST_SERIALIZATION_SPLIT_MEMBER() }; -using RemotePullRpc = - communication::rpc::RequestResponse; +using PullRpc = communication::rpc::RequestResponse; // TODO make a separate RPC for the continuation of an existing pull, as an -// optimization not to have to send the full RemotePullReqData pack every +// optimization not to have to send the full PullReqData pack every // time. RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::transaction_id_t); diff --git a/src/distributed/remote_pull_rpc_clients.cpp b/src/distributed/pull_rpc_clients.cpp similarity index 85% rename from src/distributed/remote_pull_rpc_clients.cpp rename to src/distributed/pull_rpc_clients.cpp index 0361d851e..b5c1b30bb 100644 --- a/src/distributed/remote_pull_rpc_clients.cpp +++ b/src/distributed/pull_rpc_clients.cpp @@ -1,26 +1,26 @@ #include -#include "distributed/remote_data_manager.hpp" -#include "distributed/remote_pull_rpc_clients.hpp" +#include "distributed/data_manager.hpp" +#include "distributed/pull_rpc_clients.hpp" #include "storage/edge.hpp" #include "storage/vertex.hpp" namespace distributed { -utils::Future RemotePullRpcClients::RemotePull( +utils::Future PullRpcClients::Pull( database::GraphDbAccessor &dba, int worker_id, int64_t plan_id, const Parameters ¶ms, const std::vector &symbols, bool accumulate, int batch_size) { - return clients_.ExecuteOnWorker( + return clients_.ExecuteOnWorker( worker_id, [&dba, plan_id, params, symbols, accumulate, batch_size](ClientPool &client_pool) { - auto result = client_pool.Call( + auto result = client_pool.Call( dba.transaction_id(), dba.transaction().snapshot(), plan_id, params, symbols, accumulate, batch_size, true, true); auto handle_vertex = [&dba](auto &v) { dba.db() - .remote_data_manager() + .data_manager() .Elements(dba.transaction_id()) .emplace(v.global_address.gid(), std::move(v.old_record), std::move(v.new_record)); @@ -31,7 +31,7 @@ utils::Future RemotePullRpcClients::RemotePull( }; auto handle_edge = [&dba](auto &e) { dba.db() - .remote_data_manager() + .data_manager() .Elements(dba.transaction_id()) .emplace(e.global_address.gid(), std::move(e.old_record), std::move(e.new_record)); @@ -61,7 +61,7 @@ utils::Future RemotePullRpcClients::RemotePull( } std::vector> -RemotePullRpcClients::NotifyAllTransactionCommandAdvanced( +PullRpcClients::NotifyAllTransactionCommandAdvanced( tx::transaction_id_t tx_id) { return clients_.ExecuteOnWorkers(0, [tx_id](auto &client) { auto res = client.template Call(tx_id); diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/pull_rpc_clients.hpp similarity index 62% rename from src/distributed/remote_pull_rpc_clients.hpp rename to src/distributed/pull_rpc_clients.hpp index d00d0db9e..7eab90d15 100644 --- a/src/distributed/remote_pull_rpc_clients.hpp +++ b/src/distributed/pull_rpc_clients.hpp @@ -3,7 +3,7 @@ #include #include "database/graph_db_accessor.hpp" -#include "distributed/remote_pull_produce_rpc_messages.hpp" +#include "distributed/pull_produce_rpc_messages.hpp" #include "distributed/rpc_worker_clients.hpp" #include "query/frontend/semantic/symbol.hpp" #include "query/parameters.hpp" @@ -16,23 +16,24 @@ namespace distributed { /// and getting the results of that execution. The results are returned in /// batches and are therefore accompanied with an enum indicator of the state of /// remote execution. -class RemotePullRpcClients { +class PullRpcClients { using ClientPool = communication::rpc::ClientPool; public: - RemotePullRpcClients(RpcWorkerClients &clients) : clients_(clients) {} + PullRpcClients(RpcWorkerClients &clients) : clients_(clients) {} /// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this /// function for the same (tx_id, worker_id, plan_id) before the previous call /// has ended. /// - /// @todo: it might be cleaner to split RemotePull into {InitRemoteCursor, - /// RemotePull, RemoteAccumulate}, but that's a lot of refactoring and more + /// @todo: it might be cleaner to split Pull into {InitRemoteCursor, + /// Pull, RemoteAccumulate}, but that's a lot of refactoring and more /// RPC calls. - utils::Future RemotePull( - database::GraphDbAccessor &dba, int worker_id, int64_t plan_id, - const Parameters ¶ms, const std::vector &symbols, - bool accumulate, int batch_size = kDefaultBatchSize); + utils::Future Pull(database::GraphDbAccessor &dba, int worker_id, + int64_t plan_id, const Parameters ¶ms, + const std::vector &symbols, + bool accumulate, + int batch_size = kDefaultBatchSize); auto GetWorkerIds() { return clients_.GetWorkerIds(); } diff --git a/src/distributed/remote_data_manager.cpp b/src/distributed/remote_data_manager.cpp deleted file mode 100644 index be30a412a..000000000 --- a/src/distributed/remote_data_manager.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include "distributed/remote_data_manager.hpp" -#include "database/storage.hpp" - -namespace distributed { - -template -RemoteCache &RemoteDataManager::GetCache(CacheT &collection, - tx::transaction_id_t tx_id) { - auto access = collection.access(); - auto found = access.find(tx_id); - if (found != access.end()) return found->second; - - return access - .emplace( - tx_id, std::make_tuple(tx_id), - std::make_tuple(std::ref(storage_), std::ref(remote_data_clients_))) - .first->second; -} - -template <> -RemoteCache &RemoteDataManager::Elements( - tx::transaction_id_t tx_id) { - return GetCache(vertices_caches_, tx_id); -} - -template <> -RemoteCache &RemoteDataManager::Elements( - tx::transaction_id_t tx_id) { - return GetCache(edges_caches_, tx_id); -} - -RemoteDataManager::RemoteDataManager( - database::Storage &storage, - distributed::RemoteDataRpcClients &remote_data_clients) - : storage_(storage), remote_data_clients_(remote_data_clients) {} - -void RemoteDataManager::ClearCacheForSingleTransaction( - tx::transaction_id_t tx_id) { - Elements(tx_id).ClearCache(); - Elements(tx_id).ClearCache(); -} - -void RemoteDataManager::ClearTransactionalCache( - tx::transaction_id_t oldest_active) { - auto vertex_access = vertices_caches_.access(); - for (auto &kv : vertex_access) { - if (kv.first < oldest_active) { - vertex_access.remove(kv.first); - } - } - auto edge_access = edges_caches_.access(); - for (auto &kv : edge_access) { - if (kv.first < oldest_active) { - edge_access.remove(kv.first); - } - } -} - -} // namespace distributed diff --git a/src/distributed/remote_data_rpc_clients.cpp b/src/distributed/remote_data_rpc_clients.cpp deleted file mode 100644 index 6f6b90933..000000000 --- a/src/distributed/remote_data_rpc_clients.cpp +++ /dev/null @@ -1,26 +0,0 @@ -#include "distributed/remote_data_rpc_clients.hpp" -#include "distributed/remote_data_rpc_messages.hpp" -#include "storage/edge.hpp" -#include "storage/vertex.hpp" - -namespace distributed { - -template <> -std::unique_ptr RemoteDataRpcClients::RemoteElement( - int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { - auto response = clients_.GetClientPool(worker_id).Call( - TxGidPair{tx_id, gid}); - CHECK(response) << "RemoteEdgeRpc failed"; - return std::move(response->name_output_); -} - -template <> -std::unique_ptr RemoteDataRpcClients::RemoteElement( - int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { - auto response = clients_.GetClientPool(worker_id).Call( - TxGidPair{tx_id, gid}); - CHECK(response) << "RemoteVertexRpc failed"; - return std::move(response->name_output_); -} - -} // namespace distributed diff --git a/src/distributed/remote_data_rpc_server.cpp b/src/distributed/remote_data_rpc_server.cpp deleted file mode 100644 index 0d39b348c..000000000 --- a/src/distributed/remote_data_rpc_server.cpp +++ /dev/null @@ -1,28 +0,0 @@ -#include - -#include "database/graph_db_accessor.hpp" -#include "distributed/remote_data_rpc_messages.hpp" -#include "remote_data_rpc_server.hpp" - -namespace distributed { - -RemoteDataRpcServer::RemoteDataRpcServer(database::GraphDb &db, - communication::rpc::Server &server) - : db_(db), rpc_server_(server) { - rpc_server_.Register([this](const RemoteVertexReq &req) { - database::GraphDbAccessor dba(db_, req.member.tx_id); - auto vertex = dba.FindVertex(req.member.gid, false); - CHECK(vertex.GetOld()) - << "Old record must exist when sending vertex by RPC"; - return std::make_unique(vertex.GetOld(), db_.WorkerId()); - }); - - rpc_server_.Register([this](const RemoteEdgeReq &req) { - database::GraphDbAccessor dba(db_, req.member.tx_id); - auto edge = dba.FindEdge(req.member.gid, false); - CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC"; - return std::make_unique(edge.GetOld(), db_.WorkerId()); - }); -} - -} // namespace distributed diff --git a/src/distributed/remote_updates_rpc_clients.cpp b/src/distributed/remote_updates_rpc_clients.cpp deleted file mode 100644 index 13181b30c..000000000 --- a/src/distributed/remote_updates_rpc_clients.cpp +++ /dev/null @@ -1,125 +0,0 @@ - -#include -#include - -#include "distributed/remote_updates_rpc_clients.hpp" -#include "query/exceptions.hpp" - -namespace distributed { - -namespace { -void RaiseIfRemoteError(RemoteUpdateResult result) { - switch (result) { - case RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: - throw query::RemoveAttachedVertexException(); - case RemoteUpdateResult::SERIALIZATION_ERROR: - throw mvcc::SerializationError(); - case RemoteUpdateResult::LOCK_TIMEOUT_ERROR: - throw LockTimeoutException( - "Remote LockTimeoutError during edge creation"); - case RemoteUpdateResult::UPDATE_DELETED_ERROR: - throw RecordDeletedError(); - case RemoteUpdateResult::DONE: - break; - } -} -} - -RemoteUpdateResult RemoteUpdatesRpcClients::RemoteUpdate( - int worker_id, const database::StateDelta &delta) { - auto res = - worker_clients_.GetClientPool(worker_id).Call(delta); - CHECK(res) << "RemoteUpdateRpc failed on worker: " << worker_id; - return res->member; -} - -gid::Gid RemoteUpdatesRpcClients::RemoteCreateVertex( - int worker_id, tx::transaction_id_t tx_id, - const std::vector &labels, - const std::unordered_map - &properties) { - auto res = - worker_clients_.GetClientPool(worker_id).Call( - RemoteCreateVertexReqData{tx_id, labels, properties}); - CHECK(res) << "RemoteCreateVertexRpc failed on worker: " << worker_id; - CHECK(res->member.result == RemoteUpdateResult::DONE) - << "Remote Vertex creation result not RemoteUpdateResult::DONE"; - return res->member.gid; -} - -storage::EdgeAddress RemoteUpdatesRpcClients::RemoteCreateEdge( - tx::transaction_id_t tx_id, VertexAccessor &from, VertexAccessor &to, - storage::EdgeType edge_type) { - CHECK(from.address().is_remote()) - << "In RemoteCreateEdge `from` must be remote"; - - int from_worker = from.address().worker_id(); - auto res = worker_clients_.GetClientPool(from_worker) - .Call(RemoteCreateEdgeReqData{ - from.gid(), to.GlobalAddress(), edge_type, tx_id}); - CHECK(res) << "RemoteCreateEdge RPC failed on worker: " << from_worker; - RaiseIfRemoteError(res->member.result); - return {res->member.gid, from_worker}; -} - -void RemoteUpdatesRpcClients::RemoteAddInEdge(tx::transaction_id_t tx_id, - VertexAccessor &from, - storage::EdgeAddress edge_address, - VertexAccessor &to, - storage::EdgeType edge_type) { - CHECK(to.address().is_remote() && edge_address.is_remote() && - (from.GlobalAddress().worker_id() != to.address().worker_id())) - << "RemoteAddInEdge should only be called when `to` is remote and " - "`from` is not on the same worker as `to`."; - auto worker_id = to.GlobalAddress().worker_id(); - auto res = worker_clients_.GetClientPool(worker_id).Call( - RemoteAddInEdgeReqData{from.GlobalAddress(), edge_address, to.gid(), - edge_type, tx_id}); - CHECK(res) << "RemoteAddInEdge RPC failed on worker: " << worker_id; - RaiseIfRemoteError(res->member); -} - -void RemoteUpdatesRpcClients::RemoteRemoveVertex(int worker_id, - tx::transaction_id_t tx_id, - gid::Gid gid, - bool check_empty) { - auto res = - worker_clients_.GetClientPool(worker_id).Call( - RemoteRemoveVertexReqData{gid, tx_id, check_empty}); - CHECK(res) << "RemoteRemoveVertex RPC failed on worker: " << worker_id; - RaiseIfRemoteError(res->member); -} - -void RemoteUpdatesRpcClients::RemoteRemoveEdge( - tx::transaction_id_t tx_id, int worker_id, gid::Gid edge_gid, - gid::Gid vertex_from_id, storage::VertexAddress vertex_to_addr) { - auto res = worker_clients_.GetClientPool(worker_id).Call( - RemoteRemoveEdgeData{tx_id, edge_gid, vertex_from_id, vertex_to_addr}); - CHECK(res) << "RemoteRemoveEdge RPC failed on worker: " << worker_id; - RaiseIfRemoteError(res->member); -} - -void RemoteUpdatesRpcClients::RemoteRemoveInEdge( - tx::transaction_id_t tx_id, int worker_id, gid::Gid vertex_id, - storage::EdgeAddress edge_address) { - CHECK(edge_address.is_remote()) - << "RemoteRemoveInEdge edge_address is local."; - auto res = - worker_clients_.GetClientPool(worker_id).Call( - RemoteRemoveInEdgeData{tx_id, vertex_id, edge_address}); - CHECK(res) << "RemoteRemoveInEdge RPC failed on worker: " << worker_id; - RaiseIfRemoteError(res->member); -} - -std::vector> -RemoteUpdatesRpcClients::RemoteUpdateApplyAll(int skip_worker_id, - tx::transaction_id_t tx_id) { - return worker_clients_.ExecuteOnWorkers( - skip_worker_id, [tx_id](auto &client) { - auto res = client.template Call(tx_id); - CHECK(res) << "RemoteUpdateApplyRpc failed"; - return res->member; - }); -} - -} // namespace distributed diff --git a/src/distributed/updates_rpc_clients.cpp b/src/distributed/updates_rpc_clients.cpp new file mode 100644 index 000000000..731b95bb9 --- /dev/null +++ b/src/distributed/updates_rpc_clients.cpp @@ -0,0 +1,116 @@ + +#include +#include + +#include "distributed/updates_rpc_clients.hpp" +#include "query/exceptions.hpp" + +namespace distributed { + +namespace { +void RaiseIfRemoteError(UpdateResult result) { + switch (result) { + case UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: + throw query::RemoveAttachedVertexException(); + case UpdateResult::SERIALIZATION_ERROR: + throw mvcc::SerializationError(); + case UpdateResult::LOCK_TIMEOUT_ERROR: + throw LockTimeoutException( + "Remote LockTimeoutError during edge creation"); + case UpdateResult::UPDATE_DELETED_ERROR: + throw RecordDeletedError(); + case UpdateResult::DONE: + break; + } +} +} + +UpdateResult UpdatesRpcClients::Update(int worker_id, + const database::StateDelta &delta) { + auto res = worker_clients_.GetClientPool(worker_id).Call(delta); + CHECK(res) << "UpdateRpc failed on worker: " << worker_id; + return res->member; +} + +gid::Gid UpdatesRpcClients::CreateVertex( + int worker_id, tx::transaction_id_t tx_id, + const std::vector &labels, + const std::unordered_map + &properties) { + auto res = worker_clients_.GetClientPool(worker_id).Call( + CreateVertexReqData{tx_id, labels, properties}); + CHECK(res) << "CreateVertexRpc failed on worker: " << worker_id; + CHECK(res->member.result == UpdateResult::DONE) + << "Remote Vertex creation result not UpdateResult::DONE"; + return res->member.gid; +} + +storage::EdgeAddress UpdatesRpcClients::CreateEdge( + tx::transaction_id_t tx_id, VertexAccessor &from, VertexAccessor &to, + storage::EdgeType edge_type) { + CHECK(from.address().is_remote()) << "In CreateEdge `from` must be remote"; + + int from_worker = from.address().worker_id(); + auto res = worker_clients_.GetClientPool(from_worker) + .Call(CreateEdgeReqData{ + from.gid(), to.GlobalAddress(), edge_type, tx_id}); + CHECK(res) << "CreateEdge RPC failed on worker: " << from_worker; + RaiseIfRemoteError(res->member.result); + return {res->member.gid, from_worker}; +} + +void UpdatesRpcClients::AddInEdge(tx::transaction_id_t tx_id, + VertexAccessor &from, + storage::EdgeAddress edge_address, + VertexAccessor &to, + storage::EdgeType edge_type) { + CHECK(to.address().is_remote() && edge_address.is_remote() && + (from.GlobalAddress().worker_id() != to.address().worker_id())) + << "AddInEdge should only be called when `to` is remote and " + "`from` is not on the same worker as `to`."; + auto worker_id = to.GlobalAddress().worker_id(); + auto res = worker_clients_.GetClientPool(worker_id).Call( + AddInEdgeReqData{from.GlobalAddress(), edge_address, to.gid(), edge_type, + tx_id}); + CHECK(res) << "AddInEdge RPC failed on worker: " << worker_id; + RaiseIfRemoteError(res->member); +} + +void UpdatesRpcClients::RemoveVertex(int worker_id, tx::transaction_id_t tx_id, + gid::Gid gid, bool check_empty) { + auto res = worker_clients_.GetClientPool(worker_id).Call( + RemoveVertexReqData{gid, tx_id, check_empty}); + CHECK(res) << "RemoveVertex RPC failed on worker: " << worker_id; + RaiseIfRemoteError(res->member); +} + +void UpdatesRpcClients::RemoveEdge(tx::transaction_id_t tx_id, int worker_id, + gid::Gid edge_gid, gid::Gid vertex_from_id, + storage::VertexAddress vertex_to_addr) { + auto res = worker_clients_.GetClientPool(worker_id).Call( + RemoveEdgeData{tx_id, edge_gid, vertex_from_id, vertex_to_addr}); + CHECK(res) << "RemoveEdge RPC failed on worker: " << worker_id; + RaiseIfRemoteError(res->member); +} + +void UpdatesRpcClients::RemoveInEdge(tx::transaction_id_t tx_id, int worker_id, + gid::Gid vertex_id, + storage::EdgeAddress edge_address) { + CHECK(edge_address.is_remote()) << "RemoveInEdge edge_address is local."; + auto res = worker_clients_.GetClientPool(worker_id).Call( + RemoveInEdgeData{tx_id, vertex_id, edge_address}); + CHECK(res) << "RemoveInEdge RPC failed on worker: " << worker_id; + RaiseIfRemoteError(res->member); +} + +std::vector> UpdatesRpcClients::UpdateApplyAll( + int skip_worker_id, tx::transaction_id_t tx_id) { + return worker_clients_.ExecuteOnWorkers( + skip_worker_id, [tx_id](auto &client) { + auto res = client.template Call(tx_id); + CHECK(res) << "UpdateApplyRpc failed"; + return res->member; + }); +} + +} // namespace distributed diff --git a/src/distributed/remote_updates_rpc_clients.hpp b/src/distributed/updates_rpc_clients.hpp similarity index 57% rename from src/distributed/remote_updates_rpc_clients.hpp rename to src/distributed/updates_rpc_clients.hpp index b5f95c3b5..ffbfc60f1 100644 --- a/src/distributed/remote_updates_rpc_clients.hpp +++ b/src/distributed/updates_rpc_clients.hpp @@ -4,8 +4,8 @@ #include #include "database/state_delta.hpp" -#include "distributed/remote_updates_rpc_messages.hpp" #include "distributed/rpc_worker_clients.hpp" +#include "distributed/updates_rpc_messages.hpp" #include "query/typed_value.hpp" #include "storage/address_types.hpp" #include "storage/gid.hpp" @@ -18,17 +18,16 @@ namespace distributed { /// Exposes the functionality to send updates to other workers (that own the /// graph element we are updating). Also enables us to call for a worker to /// apply the accumulated deferred updates, or discard them. -class RemoteUpdatesRpcClients { +class UpdatesRpcClients { public: - explicit RemoteUpdatesRpcClients(RpcWorkerClients &clients) + explicit UpdatesRpcClients(RpcWorkerClients &clients) : worker_clients_(clients) {} /// Sends an update delta to the given worker. - RemoteUpdateResult RemoteUpdate(int worker_id, - const database::StateDelta &delta); + UpdateResult Update(int worker_id, const database::StateDelta &delta); /// Creates a vertex on the given worker and returns it's id. - gid::Gid RemoteCreateVertex( + gid::Gid CreateVertex( int worker_id, tx::transaction_id_t tx_id, const std::vector &labels, const std::unordered_map @@ -37,39 +36,37 @@ class RemoteUpdatesRpcClients { /// Creates an edge on the given worker and returns it's address. If the `to` /// vertex is on the same worker as `from`, then all remote CRUD will be /// handled by a call to this function. Otherwise a separate call to - /// `RemoteAddInEdge` might be necessary. Throws all the exceptions that can + /// `AddInEdge` might be necessary. Throws all the exceptions that can /// occur remotely as a result of updating a vertex. - storage::EdgeAddress RemoteCreateEdge(tx::transaction_id_t tx_id, - VertexAccessor &from, - VertexAccessor &to, - storage::EdgeType edge_type); + storage::EdgeAddress CreateEdge(tx::transaction_id_t tx_id, + VertexAccessor &from, VertexAccessor &to, + storage::EdgeType edge_type); /// Adds the edge with the given address to the `to` vertex as an incoming /// edge. Only used when `to` is remote and not on the same worker as `from`. - void RemoteAddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from, - storage::EdgeAddress edge_address, VertexAccessor &to, - storage::EdgeType edge_type); + void AddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from, + storage::EdgeAddress edge_address, VertexAccessor &to, + storage::EdgeType edge_type); /// Removes a vertex from the other worker. - void RemoteRemoveVertex(int worker_id, tx::transaction_id_t tx_id, - gid::Gid gid, bool check_empty); + void RemoveVertex(int worker_id, tx::transaction_id_t tx_id, gid::Gid gid, + bool check_empty); /// Removes an edge on another worker. This also handles the `from` vertex /// outgoing edge, as that vertex is on the same worker as the edge. If the /// `to` vertex is on the same worker, then that side is handled too by the /// single RPC call, otherwise a separate call has to be made to - /// RemoteRemoveInEdge. - void RemoteRemoveEdge(tx::transaction_id_t tx_id, int worker_id, - gid::Gid edge_gid, gid::Gid vertex_from_id, - storage::VertexAddress vertex_to_addr); + /// RemoveInEdge. + void RemoveEdge(tx::transaction_id_t tx_id, int worker_id, gid::Gid edge_gid, + gid::Gid vertex_from_id, + storage::VertexAddress vertex_to_addr); - void RemoteRemoveInEdge(tx::transaction_id_t tx_id, int worker_id, - gid::Gid vertex_id, - storage::EdgeAddress edge_address); + void RemoveInEdge(tx::transaction_id_t tx_id, int worker_id, + gid::Gid vertex_id, storage::EdgeAddress edge_address); /// Calls for all the workers (except the given one) to apply their updates /// and returns the future results. - std::vector> RemoteUpdateApplyAll( + std::vector> UpdateApplyAll( int skip_worker_id, tx::transaction_id_t tx_id); private: diff --git a/src/distributed/remote_updates_rpc_messages.hpp b/src/distributed/updates_rpc_messages.hpp similarity index 55% rename from src/distributed/remote_updates_rpc_messages.hpp rename to src/distributed/updates_rpc_messages.hpp index dc68e003b..218f927cd 100644 --- a/src/distributed/remote_updates_rpc_messages.hpp +++ b/src/distributed/updates_rpc_messages.hpp @@ -14,7 +14,7 @@ namespace distributed { /// The result of sending or applying a deferred update to a worker. -enum class RemoteUpdateResult { +enum class UpdateResult { DONE, SERIALIZATION_ERROR, LOCK_TIMEOUT_ERROR, @@ -22,19 +22,17 @@ enum class RemoteUpdateResult { UNABLE_TO_DELETE_VERTEX_ERROR }; -RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateReq, database::StateDelta); -RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateRes, RemoteUpdateResult); -using RemoteUpdateRpc = - communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(UpdateReq, database::StateDelta); +RPC_SINGLE_MEMBER_MESSAGE(UpdateRes, UpdateResult); +using UpdateRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyReq, tx::transaction_id_t); -RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyRes, RemoteUpdateResult); -using RemoteUpdateApplyRpc = - communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(UpdateApplyReq, tx::transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(UpdateApplyRes, UpdateResult); +using UpdateApplyRpc = + communication::rpc::RequestResponse; -struct RemoteCreateResult { - RemoteUpdateResult result; +struct CreateResult { + UpdateResult result; // Only valid if creation was successful. gid::Gid gid; @@ -48,7 +46,7 @@ struct RemoteCreateResult { } }; -struct RemoteCreateVertexReqData { +struct CreateVertexReqData { tx::transaction_id_t tx_id; std::vector labels; std::unordered_map properties; @@ -84,13 +82,12 @@ struct RemoteCreateVertexReqData { BOOST_SERIALIZATION_SPLIT_MEMBER() }; -RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateVertexReq, RemoteCreateVertexReqData); -RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateVertexRes, RemoteCreateResult); -using RemoteCreateVertexRpc = - communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(CreateVertexReq, CreateVertexReqData); +RPC_SINGLE_MEMBER_MESSAGE(CreateVertexRes, CreateResult); +using CreateVertexRpc = + communication::rpc::RequestResponse; -struct RemoteCreateEdgeReqData { +struct CreateEdgeReqData { gid::Gid from; storage::VertexAddress to; storage::EdgeType edge_type; @@ -108,13 +105,12 @@ struct RemoteCreateEdgeReqData { } }; -RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateEdgeReq, RemoteCreateEdgeReqData); -RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateEdgeRes, RemoteCreateResult); -using RemoteCreateEdgeRpc = - communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(CreateEdgeReq, CreateEdgeReqData); +RPC_SINGLE_MEMBER_MESSAGE(CreateEdgeRes, CreateResult); +using CreateEdgeRpc = + communication::rpc::RequestResponse; -struct RemoteAddInEdgeReqData { +struct AddInEdgeReqData { storage::VertexAddress from; storage::EdgeAddress edge_address; gid::Gid to; @@ -134,12 +130,12 @@ struct RemoteAddInEdgeReqData { } }; -RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeReq, RemoteAddInEdgeReqData); -RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeRes, RemoteUpdateResult); -using RemoteAddInEdgeRpc = - communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(AddInEdgeReq, AddInEdgeReqData); +RPC_SINGLE_MEMBER_MESSAGE(AddInEdgeRes, UpdateResult); +using AddInEdgeRpc = + communication::rpc::RequestResponse; -struct RemoteRemoveVertexReqData { +struct RemoveVertexReqData { gid::Gid gid; tx::transaction_id_t tx_id; bool check_empty; @@ -155,13 +151,12 @@ struct RemoteRemoveVertexReqData { } }; -RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveVertexReq, RemoteRemoveVertexReqData); -RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveVertexRes, RemoteUpdateResult); -using RemoteRemoveVertexRpc = - communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(RemoveVertexReq, RemoveVertexReqData); +RPC_SINGLE_MEMBER_MESSAGE(RemoveVertexRes, UpdateResult); +using RemoveVertexRpc = + communication::rpc::RequestResponse; -struct RemoteRemoveEdgeData { +struct RemoveEdgeData { tx::transaction_id_t tx_id; gid::Gid edge_id; gid::Gid vertex_from_id; @@ -179,13 +174,12 @@ struct RemoteRemoveEdgeData { } }; -RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveEdgeReq, RemoteRemoveEdgeData); -RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveEdgeRes, RemoteUpdateResult); -using RemoteRemoveEdgeRpc = - communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(RemoveEdgeReq, RemoveEdgeData); +RPC_SINGLE_MEMBER_MESSAGE(RemoveEdgeRes, UpdateResult); +using RemoveEdgeRpc = + communication::rpc::RequestResponse; -struct RemoteRemoveInEdgeData { +struct RemoveInEdgeData { tx::transaction_id_t tx_id; gid::Gid vertex; storage::EdgeAddress edge_address; @@ -201,10 +195,9 @@ struct RemoteRemoveInEdgeData { } }; -RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveInEdgeReq, RemoteRemoveInEdgeData); -RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveInEdgeRes, RemoteUpdateResult); -using RemoteRemoveInEdgeRpc = - communication::rpc::RequestResponse; +RPC_SINGLE_MEMBER_MESSAGE(RemoveInEdgeReq, RemoveInEdgeData); +RPC_SINGLE_MEMBER_MESSAGE(RemoveInEdgeRes, UpdateResult); +using RemoveInEdgeRpc = + communication::rpc::RequestResponse; } // namespace distributed diff --git a/src/distributed/remote_updates_rpc_server.cpp b/src/distributed/updates_rpc_server.cpp similarity index 73% rename from src/distributed/remote_updates_rpc_server.cpp rename to src/distributed/updates_rpc_server.cpp index 980ecc648..46d9f7a3c 100644 --- a/src/distributed/remote_updates_rpc_server.cpp +++ b/src/distributed/updates_rpc_server.cpp @@ -2,14 +2,13 @@ #include "glog/logging.h" -#include "distributed/remote_updates_rpc_server.hpp" +#include "distributed/updates_rpc_server.hpp" #include "threading/sync/lock_timeout_exception.hpp" namespace distributed { template -RemoteUpdateResult -RemoteUpdatesRpcServer::TransactionUpdates::Emplace( +UpdateResult UpdatesRpcServer::TransactionUpdates::Emplace( const database::StateDelta &delta) { auto gid = std::is_same::value ? delta.vertex_id @@ -50,18 +49,17 @@ RemoteUpdatesRpcServer::TransactionUpdates::Emplace( // try { // found->second.first.update(); // } catch (const mvcc::SerializationError &) { - // return RemoteUpdateResult::SERIALIZATION_ERROR; + // return UpdateResult::SERIALIZATION_ERROR; // } catch (const RecordDeletedError &) { - // return RemoteUpdateResult::UPDATE_DELETED_ERROR; + // return UpdateResult::UPDATE_DELETED_ERROR; // } catch (const LockTimeoutException &) { - // return RemoteUpdateResult::LOCK_TIMEOUT_ERROR; + // return UpdateResult::LOCK_TIMEOUT_ERROR; // } - return RemoteUpdateResult::DONE; + return UpdateResult::DONE; } template -gid::Gid -RemoteUpdatesRpcServer::TransactionUpdates::CreateVertex( +gid::Gid UpdatesRpcServer::TransactionUpdates::CreateVertex( const std::vector &labels, const std::unordered_map &properties) { @@ -75,8 +73,7 @@ RemoteUpdatesRpcServer::TransactionUpdates::CreateVertex( } template -gid::Gid -RemoteUpdatesRpcServer::TransactionUpdates::CreateEdge( +gid::Gid UpdatesRpcServer::TransactionUpdates::CreateEdge( gid::Gid from, storage::VertexAddress to, storage::EdgeType edge_type) { auto &db = db_accessor_.db(); auto edge = db_accessor_.InsertOnlyEdge( @@ -89,8 +86,7 @@ RemoteUpdatesRpcServer::TransactionUpdates::CreateEdge( } template -RemoteUpdateResult -RemoteUpdatesRpcServer::TransactionUpdates::Apply() { +UpdateResult UpdatesRpcServer::TransactionUpdates::Apply() { std::lock_guard guard{lock_}; for (auto &kv : deltas_) { auto &record_accessor = kv.second.first; @@ -113,7 +109,7 @@ RemoteUpdatesRpcServer::TransactionUpdates::Apply() { if (!db_accessor().RemoveVertex( reinterpret_cast(record_accessor), delta.check_empty)) { - return RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR; + return UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR; } break; case database::StateDelta::Type::SET_PROPERTY_VERTEX: @@ -164,21 +160,21 @@ RemoteUpdatesRpcServer::TransactionUpdates::Apply() { break; } } catch (const mvcc::SerializationError &) { - return RemoteUpdateResult::SERIALIZATION_ERROR; + return UpdateResult::SERIALIZATION_ERROR; } catch (const RecordDeletedError &) { - return RemoteUpdateResult::UPDATE_DELETED_ERROR; + return UpdateResult::UPDATE_DELETED_ERROR; } catch (const LockTimeoutException &) { - return RemoteUpdateResult::LOCK_TIMEOUT_ERROR; + return UpdateResult::LOCK_TIMEOUT_ERROR; } } } - return RemoteUpdateResult::DONE; + return UpdateResult::DONE; } -RemoteUpdatesRpcServer::RemoteUpdatesRpcServer( - database::GraphDb &db, communication::rpc::Server &server) +UpdatesRpcServer::UpdatesRpcServer(database::GraphDb &db, + communication::rpc::Server &server) : db_(db) { - server.Register([this](const RemoteUpdateReq &req) { + server.Register([this](const UpdateReq &req) { using DeltaType = database::StateDelta::Type; auto &delta = req.member; switch (delta.type) { @@ -187,10 +183,10 @@ RemoteUpdatesRpcServer::RemoteUpdatesRpcServer( case DeltaType::REMOVE_LABEL: case database::StateDelta::Type::REMOVE_OUT_EDGE: case database::StateDelta::Type::REMOVE_IN_EDGE: - return std::make_unique( + return std::make_unique( GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta)); case DeltaType::SET_PROPERTY_EDGE: - return std::make_unique( + return std::make_unique( GetUpdates(edge_updates_, delta.transaction_id).Emplace(delta)); default: LOG(FATAL) << "Can't perform a remote update with delta type: " @@ -198,26 +194,24 @@ RemoteUpdatesRpcServer::RemoteUpdatesRpcServer( } }); - server.Register( - [this](const RemoteUpdateApplyReq &req) { - return std::make_unique(Apply(req.member)); - }); - - server.Register([this]( - const RemoteCreateVertexReq &req) { - gid::Gid gid = GetUpdates(vertex_updates_, req.member.tx_id) - .CreateVertex(req.member.labels, req.member.properties); - return std::make_unique( - RemoteCreateResult{RemoteUpdateResult::DONE, gid}); + server.Register([this](const UpdateApplyReq &req) { + return std::make_unique(Apply(req.member)); }); - server.Register([this](const RemoteCreateEdgeReq &req) { + server.Register([this](const CreateVertexReq &req) { + gid::Gid gid = GetUpdates(vertex_updates_, req.member.tx_id) + .CreateVertex(req.member.labels, req.member.properties); + return std::make_unique( + CreateResult{UpdateResult::DONE, gid}); + }); + + server.Register([this](const CreateEdgeReq &req) { auto data = req.member; auto creation_result = CreateEdge(data); // If `from` and `to` are both on this worker, we handle it in this // RPC call. Do it only if CreateEdge succeeded. - if (creation_result.result == RemoteUpdateResult::DONE && + if (creation_result.result == UpdateResult::DONE && data.to.worker_id() == db_.WorkerId()) { auto to_delta = database::StateDelta::AddInEdge( data.tx_id, data.to.gid(), {data.from, db_.WorkerId()}, @@ -226,47 +220,45 @@ RemoteUpdatesRpcServer::RemoteUpdatesRpcServer( GetUpdates(vertex_updates_, data.tx_id).Emplace(to_delta); } - return std::make_unique(creation_result); + return std::make_unique(creation_result); }); - server.Register([this](const RemoteAddInEdgeReq &req) { + server.Register([this](const AddInEdgeReq &req) { auto to_delta = database::StateDelta::AddInEdge( req.member.tx_id, req.member.to, req.member.from, req.member.edge_address, req.member.edge_type); auto result = GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); - return std::make_unique(result); + return std::make_unique(result); }); - server.Register( - [this](const RemoteRemoveVertexReq &req) { - auto to_delta = database::StateDelta::RemoveVertex( - req.member.tx_id, req.member.gid, req.member.check_empty); - auto result = - GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); - return std::make_unique(result); - }); - - server.Register([this](const RemoteRemoveEdgeReq &req) { - return std::make_unique(RemoveEdge(req.member)); + server.Register([this](const RemoveVertexReq &req) { + auto to_delta = database::StateDelta::RemoveVertex( + req.member.tx_id, req.member.gid, req.member.check_empty); + auto result = + GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); + return std::make_unique(result); }); - server.Register( - [this](const RemoteRemoveInEdgeReq &req) { - auto data = req.member; - return std::make_unique( - GetUpdates(vertex_updates_, data.tx_id) - .Emplace(database::StateDelta::RemoveInEdge( - data.tx_id, data.vertex, data.edge_address))); - }); + server.Register([this](const RemoveEdgeReq &req) { + return std::make_unique(RemoveEdge(req.member)); + }); + + server.Register([this](const RemoveInEdgeReq &req) { + auto data = req.member; + return std::make_unique( + GetUpdates(vertex_updates_, data.tx_id) + .Emplace(database::StateDelta::RemoveInEdge(data.tx_id, data.vertex, + data.edge_address))); + }); } -RemoteUpdateResult RemoteUpdatesRpcServer::Apply(tx::transaction_id_t tx_id) { +UpdateResult UpdatesRpcServer::Apply(tx::transaction_id_t tx_id) { auto apply = [tx_id](auto &collection) { auto access = collection.access(); auto found = access.find(tx_id); if (found == access.end()) { - return RemoteUpdateResult::DONE; + return UpdateResult::DONE; } auto result = found->second.Apply(); access.remove(tx_id); @@ -275,12 +267,12 @@ RemoteUpdateResult RemoteUpdatesRpcServer::Apply(tx::transaction_id_t tx_id) { auto vertex_result = apply(vertex_updates_); auto edge_result = apply(edge_updates_); - if (vertex_result != RemoteUpdateResult::DONE) return vertex_result; - if (edge_result != RemoteUpdateResult::DONE) return edge_result; - return RemoteUpdateResult::DONE; + if (vertex_result != UpdateResult::DONE) return vertex_result; + if (edge_result != UpdateResult::DONE) return edge_result; + return UpdateResult::DONE; } -void RemoteUpdatesRpcServer::ClearTransactionalCache( +void UpdatesRpcServer::ClearTransactionalCache( tx::transaction_id_t oldest_active) { auto vertex_access = vertex_updates_.access(); for (auto &kv : vertex_access) { @@ -298,17 +290,15 @@ void RemoteUpdatesRpcServer::ClearTransactionalCache( // Gets/creates the TransactionUpdates for the given transaction. template -RemoteUpdatesRpcServer::TransactionUpdates - &RemoteUpdatesRpcServer::GetUpdates(MapT &updates, - tx::transaction_id_t tx_id) { +UpdatesRpcServer::TransactionUpdates &UpdatesRpcServer::GetUpdates( + MapT &updates, tx::transaction_id_t tx_id) { return updates.access() .emplace(tx_id, std::make_tuple(tx_id), std::make_tuple(std::ref(db_), tx_id)) .first->second; } -RemoteCreateResult RemoteUpdatesRpcServer::CreateEdge( - const RemoteCreateEdgeReqData &req) { +CreateResult UpdatesRpcServer::CreateEdge(const CreateEdgeReqData &req) { auto gid = GetUpdates(edge_updates_, req.tx_id) .CreateEdge(req.from, req.to, req.edge_type); @@ -319,22 +309,21 @@ RemoteCreateResult RemoteUpdatesRpcServer::CreateEdge( return {result, gid}; } -RemoteUpdateResult RemoteUpdatesRpcServer::RemoveEdge( - const RemoteRemoveEdgeData &data) { +UpdateResult UpdatesRpcServer::RemoveEdge(const RemoveEdgeData &data) { // Edge removal. auto deletion_delta = database::StateDelta::RemoveEdge(data.tx_id, data.edge_id); auto result = GetUpdates(edge_updates_, data.tx_id).Emplace(deletion_delta); // Out-edge removal, for sure is local. - if (result == RemoteUpdateResult::DONE) { + if (result == UpdateResult::DONE) { auto remove_out_delta = database::StateDelta::RemoveOutEdge( data.tx_id, data.vertex_from_id, {data.edge_id, db_.WorkerId()}); result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_out_delta); } // In-edge removal, might not be local. - if (result == RemoteUpdateResult::DONE && + if (result == UpdateResult::DONE && data.vertex_to_address.worker_id() == db_.WorkerId()) { auto remove_in_delta = database::StateDelta::RemoveInEdge( data.tx_id, data.vertex_to_address.gid(), @@ -347,14 +336,13 @@ RemoteUpdateResult RemoteUpdatesRpcServer::RemoveEdge( template <> VertexAccessor -RemoteUpdatesRpcServer::TransactionUpdates::FindAccessor( +UpdatesRpcServer::TransactionUpdates::FindAccessor( gid::Gid gid) { return db_accessor_.FindVertex(gid, false); } template <> -EdgeAccessor -RemoteUpdatesRpcServer::TransactionUpdates::FindAccessor( +EdgeAccessor UpdatesRpcServer::TransactionUpdates::FindAccessor( gid::Gid gid) { return db_accessor_.FindEdge(gid, false); } diff --git a/src/distributed/remote_updates_rpc_server.hpp b/src/distributed/updates_rpc_server.hpp similarity index 87% rename from src/distributed/remote_updates_rpc_server.hpp rename to src/distributed/updates_rpc_server.hpp index ae8e8d765..f12ba7b37 100644 --- a/src/distributed/remote_updates_rpc_server.hpp +++ b/src/distributed/updates_rpc_server.hpp @@ -10,7 +10,7 @@ #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" #include "database/state_delta.hpp" -#include "distributed/remote_updates_rpc_messages.hpp" +#include "distributed/updates_rpc_messages.hpp" #include "query/typed_value.hpp" #include "storage/edge_accessor.hpp" #include "storage/gid.hpp" @@ -27,7 +27,7 @@ namespace distributed { /// /// Attempts to get serialization and update-after-delete errors to happen as /// soon as possible during query execution (fail fast). -class RemoteUpdatesRpcServer { +class UpdatesRpcServer { // Remote updates for one transaction. template class TransactionUpdates { @@ -38,7 +38,7 @@ class RemoteUpdatesRpcServer { /// Adds a delta and returns the result. Does not modify the state (data) of /// the graph element the update is for, but calls the `update` method to /// fail-fast on serialization and update-after-delete errors. - RemoteUpdateResult Emplace(const database::StateDelta &delta); + UpdateResult Emplace(const database::StateDelta &delta); /// Creates a new vertex and returns it's gid. gid::Gid CreateVertex( @@ -52,7 +52,7 @@ class RemoteUpdatesRpcServer { storage::EdgeType edge_type); /// Applies all the deltas on the record. - RemoteUpdateResult Apply(); + UpdateResult Apply(); auto &db_accessor() { return db_accessor_; } @@ -69,13 +69,12 @@ class RemoteUpdatesRpcServer { }; public: - RemoteUpdatesRpcServer(database::GraphDb &db, - communication::rpc::Server &server); + UpdatesRpcServer(database::GraphDb &db, communication::rpc::Server &server); /// Applies all existsing updates for the given transaction ID. If there are /// no updates for that transaction, nothing happens. Clears the updates cache /// after applying them, regardless of the result. - RemoteUpdateResult Apply(tx::transaction_id_t tx_id); + UpdateResult Apply(tx::transaction_id_t tx_id); /// Clears the cache of local transactions that have expired. The signature of /// this method is dictated by `distributed::CacheCleaner`. @@ -96,10 +95,10 @@ class RemoteUpdatesRpcServer { tx::transaction_id_t tx_id); // Performs edge creation for the given request. - RemoteCreateResult CreateEdge(const RemoteCreateEdgeReqData &req); + CreateResult CreateEdge(const CreateEdgeReqData &req); // Performs edge removal for the given request. - RemoteUpdateResult RemoveEdge(const RemoteRemoveEdgeData &data); + UpdateResult RemoveEdge(const RemoveEdgeData &data); }; } // namespace distributed diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 3748e9acd..620f97d90 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -15,9 +15,9 @@ #include "glog/logging.h" #include "database/graph_db_accessor.hpp" -#include "distributed/remote_pull_rpc_clients.hpp" -#include "distributed/remote_updates_rpc_clients.hpp" -#include "distributed/remote_updates_rpc_server.hpp" +#include "distributed/pull_rpc_clients.hpp" +#include "distributed/updates_rpc_clients.hpp" +#include "distributed/updates_rpc_server.hpp" #include "query/context.hpp" #include "query/exceptions.hpp" #include "query/frontend/ast/ast.hpp" @@ -3014,7 +3014,7 @@ class RemotePuller { RemotePuller(database::GraphDbAccessor &db, const std::vector &symbols, int64_t plan_id) : db_(db), symbols_(symbols), plan_id_(plan_id) { - worker_ids_ = db_.db().remote_pull_clients().GetWorkerIds(); + worker_ids_ = db_.db().pull_clients().GetWorkerIds(); // Remove master from the worker ids list. worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); } @@ -3022,7 +3022,7 @@ class RemotePuller { void Initialize(Context &context) { if (!remote_pulls_initialized_) { for (auto &worker_id : worker_ids_) { - UpdateRemotePullForWorker(worker_id, context); + UpdatePullForWorker(worker_id, context); } remote_pulls_initialized_ = true; } @@ -3052,30 +3052,30 @@ class RemotePuller { auto remote_results = remote_pull.get(); switch (remote_results.pull_state) { - case distributed::RemotePullState::CURSOR_EXHAUSTED: + case distributed::PullState::CURSOR_EXHAUSTED: move_frames(worker_id, remote_results); remote_pulls_.erase(found_it); break; - case distributed::RemotePullState::CURSOR_IN_PROGRESS: + case distributed::PullState::CURSOR_IN_PROGRESS: move_frames(worker_id, remote_results); - UpdateRemotePullForWorker(worker_id, context); + UpdatePullForWorker(worker_id, context); break; - case distributed::RemotePullState::SERIALIZATION_ERROR: + case distributed::PullState::SERIALIZATION_ERROR: throw mvcc::SerializationError( "Serialization error occured during PullRemote !"); - case distributed::RemotePullState::LOCK_TIMEOUT_ERROR: + case distributed::PullState::LOCK_TIMEOUT_ERROR: throw LockTimeoutException( "LockTimeout error occured during PullRemote !"); - case distributed::RemotePullState::UPDATE_DELETED_ERROR: + case distributed::PullState::UPDATE_DELETED_ERROR: throw QueryRuntimeException( "RecordDeleted error ocured during PullRemote !"); - case distributed::RemotePullState::RECONSTRUCTION_ERROR: + case distributed::PullState::RECONSTRUCTION_ERROR: throw query::ReconstructionException(); - case distributed::RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR: + case distributed::PullState::UNABLE_TO_DELETE_VERTEX_ERROR: throw RemoveAttachedVertexException(); - case distributed::RemotePullState::HINTED_ABORT_ERROR: + case distributed::PullState::HINTED_ABORT_ERROR: throw HintedAbortError(); - case distributed::RemotePullState::QUERY_ERROR: + case distributed::PullState::QUERY_ERROR: throw QueryRuntimeException( "Query runtime error occurred duing PullRemote !"); } @@ -3119,15 +3119,14 @@ class RemotePuller { database::GraphDbAccessor &db_; std::vector symbols_; int64_t plan_id_; - std::unordered_map> - remote_pulls_; + std::unordered_map> remote_pulls_; std::unordered_map>> remote_results_; std::vector worker_ids_; bool remote_pulls_initialized_ = false; - void UpdateRemotePullForWorker(int worker_id, Context &context) { - remote_pulls_[worker_id] = db_.db().remote_pull_clients().RemotePull( + void UpdatePullForWorker(int worker_id, Context &context) { + remote_pulls_[worker_id] = db_.db().pull_clients().Pull( db_, worker_id, plan_id_, context.parameters_, symbols_, false); } }; @@ -3258,12 +3257,11 @@ class SynchronizeCursor : public Cursor { auto &db = context.db_accessor_.db(); // Tell all workers to accumulate, only if there is a remote pull. - std::vector> - worker_accumulations; + std::vector> worker_accumulations; if (pull_remote_cursor_) { - for (auto worker_id : db.remote_pull_clients().GetWorkerIds()) { + for (auto worker_id : db.pull_clients().GetWorkerIds()) { if (worker_id == db.WorkerId()) continue; - worker_accumulations.emplace_back(db.remote_pull_clients().RemotePull( + worker_accumulations.emplace_back(db.pull_clients().Pull( context.db_accessor_, worker_id, self_.pull_remote()->plan_id(), context.parameters_, self_.pull_remote()->symbols(), true, 0)); } @@ -3282,29 +3280,29 @@ class SynchronizeCursor : public Cursor { // Wait for all workers to finish accumulation (first sync point). for (auto &accu : worker_accumulations) { switch (accu.get().pull_state) { - case distributed::RemotePullState::CURSOR_EXHAUSTED: + case distributed::PullState::CURSOR_EXHAUSTED: continue; - case distributed::RemotePullState::CURSOR_IN_PROGRESS: + case distributed::PullState::CURSOR_IN_PROGRESS: throw QueryRuntimeException( "Expected exhausted cursor after remote pull accumulate"); - case distributed::RemotePullState::SERIALIZATION_ERROR: + case distributed::PullState::SERIALIZATION_ERROR: throw mvcc::SerializationError( "Failed to perform remote accumulate due to SerializationError"); - case distributed::RemotePullState::UPDATE_DELETED_ERROR: + case distributed::PullState::UPDATE_DELETED_ERROR: throw QueryRuntimeException( "Failed to perform remote accumulate due to RecordDeletedError"); - case distributed::RemotePullState::LOCK_TIMEOUT_ERROR: + case distributed::PullState::LOCK_TIMEOUT_ERROR: throw LockTimeoutException( "Failed to perform remote accumulate due to " "LockTimeoutException"); - case distributed::RemotePullState::RECONSTRUCTION_ERROR: + case distributed::PullState::RECONSTRUCTION_ERROR: throw QueryRuntimeException( "Failed to perform remote accumulate due to ReconstructionError"); - case distributed::RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR: + case distributed::PullState::UNABLE_TO_DELETE_VERTEX_ERROR: throw RemoveAttachedVertexException(); - case distributed::RemotePullState::HINTED_ABORT_ERROR: + case distributed::PullState::HINTED_ABORT_ERROR: throw HintedAbortError(); - case distributed::RemotePullState::QUERY_ERROR: + case distributed::PullState::QUERY_ERROR: throw QueryRuntimeException( "Failed to perform remote accumulate due to Query runtime error"); } @@ -3317,22 +3315,22 @@ class SynchronizeCursor : public Cursor { // Make all the workers apply their deltas. auto tx_id = context.db_accessor_.transaction_id(); auto apply_futures = - db.remote_updates_clients().RemoteUpdateApplyAll(db.WorkerId(), tx_id); - db.remote_updates_server().Apply(tx_id); + db.updates_clients().UpdateApplyAll(db.WorkerId(), tx_id); + db.updates_server().Apply(tx_id); for (auto &future : apply_futures) { switch (future.get()) { - case distributed::RemoteUpdateResult::SERIALIZATION_ERROR: + case distributed::UpdateResult::SERIALIZATION_ERROR: throw mvcc::SerializationError( "Failed to apply deferred updates due to SerializationError"); - case distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: + case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: throw RemoveAttachedVertexException(); - case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR: + case distributed::UpdateResult::UPDATE_DELETED_ERROR: throw QueryRuntimeException( "Failed to apply deferred updates due to RecordDeletedError"); - case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR: + case distributed::UpdateResult::LOCK_TIMEOUT_ERROR: throw LockTimeoutException( "Failed to apply deferred update due to LockTimeoutException"); - case distributed::RemoteUpdateResult::DONE: + case distributed::UpdateResult::DONE: break; } } @@ -3340,7 +3338,7 @@ class SynchronizeCursor : public Cursor { // If the command advanced, let the workers know. if (self_.advance_command()) { auto futures = - db.remote_pull_clients().NotifyAllTransactionCommandAdvanced(tx_id); + db.pull_clients().NotifyAllTransactionCommandAdvanced(tx_id); for (auto &future : futures) future.wait(); } } diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index e81354381..73085b895 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -17,7 +17,7 @@ #include "boost/serialization/shared_ptr.hpp" #include "boost/serialization/unique_ptr.hpp" -#include "distributed/remote_pull_produce_rpc_messages.hpp" +#include "distributed/pull_produce_rpc_messages.hpp" #include "query/common.hpp" #include "query/frontend/ast/ast.hpp" #include "query/frontend/semantic/symbol.hpp" @@ -2463,7 +2463,7 @@ class Union : public LogicalOperator { /** * An operator in distributed Memgraph that yields both local and remote (from * other workers) frames. Obtaining remote frames is done through RPC calls to - * `distributed::RemoteProduceRpcServer`s running on all the workers. + * `distributed::ProduceRpcServer`s running on all the workers. * * This operator aims to yield results as fast as possible and lose minimal * time on data transfer. It gives no guarantees on result order. @@ -2514,7 +2514,7 @@ class PullRemote : public LogicalOperator { * * Logic of the synchronize operator is: * - * 1. If there is a RemotePull, tell all the workers to pull on that plan and + * 1. If there is a Pull, tell all the workers to pull on that plan and * accumulate results without sending them to the master. This is async. * 2. Accumulate local results, in parallel with 1. getting executed on workers. * 3. Wait till the master and all the workers are done accumulating. @@ -2522,7 +2522,7 @@ class PullRemote : public LogicalOperator { * 5. Tell all the workers to apply their updates. This is async. * 6. Apply local updates, in parallel with 5. on the workers. * 7. Notify workers that the command has advanced, if necessary. - * 8. Yield all the results, first local, then from RemotePull if available. + * 8. Yield all the results, first local, then from Pull if available. */ class Synchronize : public LogicalOperator { public: diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 4b566e489..e5b826a5f 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -2,8 +2,8 @@ #include "database/graph_db_accessor.hpp" #include "database/state_delta.hpp" -#include "distributed/remote_data_manager.hpp" -#include "distributed/remote_updates_rpc_clients.hpp" +#include "distributed/data_manager.hpp" +#include "distributed/updates_rpc_clients.hpp" #include "query/exceptions.hpp" #include "storage/edge.hpp" #include "storage/record_accessor.hpp" @@ -148,13 +148,12 @@ bool RecordAccessor::Reconstruct() const { // It's not possible that we have a global address for a graph element // that's local, because that is resolved in the constructor. // TODO in write queries it's possible the command has been advanced and - // we need to invalidate the RemoteCache and really get the latest stuff. + // we need to invalidate the Cache and really get the latest stuff. // But only do that after the command has been advanced. - auto &remote_cache = - dba.db().remote_data_manager().template Elements( - dba.transaction_id()); - remote_cache.FindSetOldNew(dba.transaction().id_, address_.worker_id(), - address_.gid(), old_, new_); + auto &cache = dba.db().data_manager().template Elements( + dba.transaction_id()); + cache.FindSetOldNew(dba.transaction().id_, address_.worker_id(), + address_.gid(), old_, new_); } current_ = old_ ? old_ : new_; return old_ != nullptr || new_ != nullptr; @@ -180,10 +179,9 @@ TRecord &RecordAccessor::update() const { if (is_local()) { new_ = address_.local()->update(t); } else { - auto &remote_cache = - dba.db().remote_data_manager().template Elements( - dba.transaction_id()); - new_ = remote_cache.FindNew(address_.gid()); + auto &cache = dba.db().data_manager().template Elements( + dba.transaction_id()); + new_ = cache.FindNew(address_.gid()); } DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update"; return *new_; @@ -204,18 +202,18 @@ void RecordAccessor::SendDelta( DCHECK(!is_local()) << "Only a delta created on a remote accessor should be sent"; - auto result = db_accessor().db().remote_updates_clients().RemoteUpdate( - address().worker_id(), delta); + auto result = + db_accessor().db().updates_clients().Update(address().worker_id(), delta); switch (result) { - case distributed::RemoteUpdateResult::DONE: + case distributed::UpdateResult::DONE: break; - case distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: + case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: throw query::RemoveAttachedVertexException(); - case distributed::RemoteUpdateResult::SERIALIZATION_ERROR: + case distributed::UpdateResult::SERIALIZATION_ERROR: throw mvcc::SerializationError(); - case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR: + case distributed::UpdateResult::UPDATE_DELETED_ERROR: throw RecordDeletedError(); - case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR: + case distributed::UpdateResult::LOCK_TIMEOUT_ERROR: throw LockTimeoutException("Lock timeout on remote worker"); } } diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index 1d6fa0a2a..1fd0e1446 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -5,7 +5,7 @@ #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" -#include "distributed/remote_updates_rpc_server.hpp" +#include "distributed/updates_rpc_server.hpp" #include "storage/address_types.hpp" #include "transactions/engine_master.hpp" @@ -91,9 +91,9 @@ class DistributedGraphDbTest : public ::testing::Test { VertexAccessor to{to_addr, dba}; auto r_val = dba.InsertEdge(from, to, dba.EdgeType(edge_type_name)).GlobalAddress(); - master().remote_updates_server().Apply(dba.transaction_id()); - worker(1).remote_updates_server().Apply(dba.transaction_id()); - worker(2).remote_updates_server().Apply(dba.transaction_id()); + master().updates_server().Apply(dba.transaction_id()); + worker(1).updates_server().Apply(dba.transaction_id()); + worker(2).updates_server().Apply(dba.transaction_id()); dba.Commit(); return r_val; } diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index 7d1709e29..308b0157a 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -8,11 +8,11 @@ #include "distributed/coordination.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" +#include "distributed/data_rpc_clients.hpp" +#include "distributed/data_rpc_server.hpp" #include "distributed/plan_consumer.hpp" #include "distributed/plan_dispatcher.hpp" -#include "distributed/remote_data_rpc_clients.hpp" -#include "distributed/remote_data_rpc_server.hpp" -#include "distributed/remote_pull_rpc_clients.hpp" +#include "distributed/pull_rpc_clients.hpp" #include "distributed_common.hpp" #include "io/network/endpoint.hpp" #include "query/frontend/ast/ast.hpp" diff --git a/tests/unit/distributed_interpretation.cpp b/tests/unit/distributed_interpretation.cpp index 0ccabcbfd..c6f2aefb8 100644 --- a/tests/unit/distributed_interpretation.cpp +++ b/tests/unit/distributed_interpretation.cpp @@ -44,7 +44,7 @@ class DistributedInterpretationTest : public DistributedGraphDbTest { std::experimental::optional interpreter_; }; -TEST_F(DistributedInterpretationTest, RemotePullTest) { +TEST_F(DistributedInterpretationTest, PullTest) { auto results = Run("OPTIONAL MATCH(n) UNWIND(RANGE(0, 20)) AS X RETURN 1"); ASSERT_EQ(results.size(), 3 * 21); @@ -54,7 +54,7 @@ TEST_F(DistributedInterpretationTest, RemotePullTest) { } } -TEST_F(DistributedInterpretationTest, RemotePullNoResultsTest) { +TEST_F(DistributedInterpretationTest, PullNoResultsTest) { auto results = Run("MATCH (n) RETURN n"); ASSERT_EQ(results.size(), 0U); } diff --git a/tests/unit/distributed_query_plan.cpp b/tests/unit/distributed_query_plan.cpp index 0f248042f..b3a5a9f91 100644 --- a/tests/unit/distributed_query_plan.cpp +++ b/tests/unit/distributed_query_plan.cpp @@ -8,11 +8,11 @@ #include "distributed/coordination.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" +#include "distributed/data_rpc_clients.hpp" +#include "distributed/data_rpc_server.hpp" #include "distributed/plan_consumer.hpp" #include "distributed/plan_dispatcher.hpp" -#include "distributed/remote_data_rpc_clients.hpp" -#include "distributed/remote_data_rpc_server.hpp" -#include "distributed/remote_pull_rpc_clients.hpp" +#include "distributed/pull_rpc_clients.hpp" #include "distributed_common.hpp" #include "io/network/endpoint.hpp" #include "query/frontend/ast/ast.hpp" @@ -31,7 +31,7 @@ DECLARE_int32(query_execution_time_sec); using namespace distributed; using namespace database; -TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { +TEST_F(DistributedGraphDbTest, PullProduceRpc) { GraphDbAccessor dba{master()}; Context ctx{dba}; SymbolGenerator symbol_generator{ctx.symbol_table_}; @@ -60,12 +60,11 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { std::vector symbols{ctx.symbol_table_[*x_ne]}; auto remote_pull = [this, ¶ms, &symbols](GraphDbAccessor &dba, int worker_id) { - return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id, - params, symbols, false, 3); + return master().pull_clients().Pull(dba, worker_id, plan_id, params, + symbols, false, 3); }; auto expect_first_batch = [](auto &batch) { - EXPECT_EQ(batch.pull_state, - distributed::RemotePullState::CURSOR_IN_PROGRESS); + EXPECT_EQ(batch.pull_state, distributed::PullState::CURSOR_IN_PROGRESS); ASSERT_EQ(batch.frames.size(), 3); ASSERT_EQ(batch.frames[0].size(), 1); EXPECT_EQ(batch.frames[0][0].ValueInt(), 42); @@ -73,7 +72,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { EXPECT_EQ(batch.frames[2][0].ValueString(), "bla"); }; auto expect_second_batch = [](auto &batch) { - EXPECT_EQ(batch.pull_state, distributed::RemotePullState::CURSOR_EXHAUSTED); + EXPECT_EQ(batch.pull_state, distributed::PullState::CURSOR_EXHAUSTED); ASSERT_EQ(batch.frames.size(), 2); ASSERT_EQ(batch.frames[0].size(), 1); EXPECT_EQ(batch.frames[0][0].ValueInt(), 1); @@ -95,7 +94,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { } } -TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { +TEST_F(DistributedGraphDbTest, PullProduceRpcWithGraphElements) { // Create some data on the master and both workers. Eeach edge (3 of them) and // vertex (6 of them) will be uniquely identified with their worker id and // sequence ID, so we can check we retrieved all. @@ -180,8 +179,8 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { ctx.symbol_table_[*return_m], p_sym}; auto remote_pull = [this, ¶ms, &symbols](GraphDbAccessor &dba, int worker_id) { - return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id, - params, symbols, false, 3); + return master().pull_clients().Pull(dba, worker_id, plan_id, params, + symbols, false, 3); }; auto future_w1_results = remote_pull(dba, 1); auto future_w2_results = remote_pull(dba, 2); @@ -352,13 +351,13 @@ TEST_F(DistributedTransactionTimeout, Timeout) { std::vector symbols{ctx.symbol_table_[*output]}; auto remote_pull = [this, ¶ms, &symbols, &dba]() { return master() - .remote_pull_clients() - .RemotePull(dba, 1, plan_id, params, symbols, false, 1) + .pull_clients() + .Pull(dba, 1, plan_id, params, symbols, false, 1) .get() .pull_state; }; - ASSERT_EQ(remote_pull(), distributed::RemotePullState::CURSOR_IN_PROGRESS); + ASSERT_EQ(remote_pull(), distributed::PullState::CURSOR_IN_PROGRESS); // Sleep here so the remote gets a hinted error. std::this_thread::sleep_for(2s); - EXPECT_EQ(remote_pull(), distributed::RemotePullState::HINTED_ABORT_ERROR); + EXPECT_EQ(remote_pull(), distributed::PullState::HINTED_ABORT_ERROR); } diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index c89a9d59d..14cd82f61 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -4,8 +4,8 @@ #include #include "database/graph_db_accessor.hpp" -#include "distributed/remote_updates_rpc_clients.hpp" -#include "distributed/remote_updates_rpc_server.hpp" +#include "distributed/updates_rpc_clients.hpp" +#include "distributed/updates_rpc_server.hpp" #include "query/typed_value.hpp" #include "storage/property_value.hpp" @@ -53,14 +53,14 @@ class DistributedUpdateTest : public DistributedGraphDbTest { EXPECT_EQ(var->has_label(label), new_result); \ } -TEST_F(DistributedUpdateTest, RemoteUpdateLocalOnly) { +TEST_F(DistributedUpdateTest, UpdateLocalOnly) { EXPECT_LABEL(v1_dba2, false, true); EXPECT_LABEL(v1_dba1, false, false); } -TEST_F(DistributedUpdateTest, RemoteUpdateApply) { +TEST_F(DistributedUpdateTest, UpdateApply) { EXPECT_LABEL(v1_dba1, false, false); - worker(1).remote_updates_server().Apply(dba1->transaction_id()); + worker(1).updates_server().Apply(dba1->transaction_id()); EXPECT_LABEL(v1_dba1, false, true); } @@ -90,7 +90,7 @@ TEST_F(DistributedGraphDbTest, CreateVertexWithUpdate) { gid = v.gid(); prop = dba.Property("prop"); v.PropsSet(prop, 42); - worker(2).remote_updates_server().Apply(dba.transaction_id()); + worker(2).updates_server().Apply(dba.transaction_id()); dba.Commit(); } { @@ -119,7 +119,7 @@ TEST_F(DistributedGraphDbTest, CreateVertexWithData) { EXPECT_TRUE(v.has_label(l2)); EXPECT_EQ(v.PropsAt(prop).Value(), 42); - worker(2).remote_updates_server().Apply(dba.transaction_id()); + worker(2).updates_server().Apply(dba.transaction_id()); dba.Commit(); } { @@ -156,9 +156,8 @@ TEST_F(DistributedGraphDbTest, UpdateVertexRemoteAndLocal) { v_remote.add_label(l2); v_local.add_label(l1); - auto result = - worker(1).remote_updates_server().Apply(dba0.transaction_id()); - EXPECT_EQ(result, distributed::RemoteUpdateResult::DONE); + auto result = worker(1).updates_server().Apply(dba0.transaction_id()); + EXPECT_EQ(result, distributed::UpdateResult::DONE); } } @@ -172,7 +171,7 @@ TEST_F(DistributedGraphDbTest, AddSameLabelRemoteAndLocal) { auto l1 = dba1.Label("label"); v_remote.add_label(l1); v_local.add_label(l1); - worker(1).remote_updates_server().Apply(dba0.transaction_id()); + worker(1).updates_server().Apply(dba0.transaction_id()); dba0.Commit(); } { @@ -191,7 +190,7 @@ TEST_F(DistributedGraphDbTest, IndexGetsUpdatedRemotely) { label = dba0.Label("label"); VertexAccessor va(v_remote, dba0); va.add_label(label); - worker(1).remote_updates_server().Apply(dba0.transaction_id()); + worker(1).updates_server().Apply(dba0.transaction_id()); dba0.Commit(); } { @@ -208,8 +207,8 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteCommit) { auto v_remote = VertexAccessor(v_address, dba0); dba0.RemoveVertex(v_remote); EXPECT_TRUE(dba1.FindVertexOptional(v_address.gid(), true)); - EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), - distributed::RemoteUpdateResult::DONE); + EXPECT_EQ(worker(1).updates_server().Apply(dba0.transaction_id()), + distributed::UpdateResult::DONE); EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true)); } @@ -222,8 +221,8 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteBothDelete) { auto v_remote = VertexAccessor(v_address, dba0); EXPECT_TRUE(dba1.RemoveVertex(v_local)); EXPECT_TRUE(dba0.RemoveVertex(v_remote)); - EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), - distributed::RemoteUpdateResult::DONE); + EXPECT_EQ(worker(1).updates_server().Apply(dba0.transaction_id()), + distributed::UpdateResult::DONE); EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true)); } } @@ -237,8 +236,8 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) { database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()}; auto v_remote = VertexAccessor(v_address, dba0); dba0.RemoveVertex(v_remote); - EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), - distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR); + EXPECT_EQ(worker(1).updates_server().Apply(dba0.transaction_id()), + distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR); EXPECT_TRUE(dba1.FindVertexOptional(v_address.gid(), true)); } { @@ -251,8 +250,8 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) { dba1.RemoveEdge(e_local); dba0.RemoveVertex(v_remote); - EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), - distributed::RemoteUpdateResult::DONE); + EXPECT_EQ(worker(1).updates_server().Apply(dba0.transaction_id()), + distributed::UpdateResult::DONE); EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true)); } } @@ -287,9 +286,9 @@ class DistributedDetachDeleteTest : public DistributedGraphDbTest { accessor.DetachRemoveVertex(v_accessor); for (auto db_accessor : dba) { - ASSERT_EQ(db_accessor.get().db().remote_updates_server().Apply( + ASSERT_EQ(db_accessor.get().db().updates_server().Apply( dba[0].get().transaction_id()), - distributed::RemoteUpdateResult::DONE); + distributed::UpdateResult::DONE); } check_func(dba); @@ -380,9 +379,9 @@ class DistributedEdgeCreateTest : public DistributedGraphDbTest { for (auto &kv : props) edge.PropsSet(dba.Property(kv.first), kv.second); - master().remote_updates_server().Apply(dba.transaction_id()); - worker(1).remote_updates_server().Apply(dba.transaction_id()); - worker(2).remote_updates_server().Apply(dba.transaction_id()); + master().updates_server().Apply(dba.transaction_id()); + worker(1).updates_server().Apply(dba.transaction_id()); + worker(2).updates_server().Apply(dba.transaction_id()); dba.Commit(); } @@ -487,9 +486,9 @@ class DistributedEdgeRemoveTest : public DistributedGraphDbTest { database::GraphDbAccessor dba{db}; EdgeAccessor edge{edge_addr, dba}; dba.RemoveEdge(edge); - master().remote_updates_server().Apply(dba.transaction_id()); - worker(1).remote_updates_server().Apply(dba.transaction_id()); - worker(2).remote_updates_server().Apply(dba.transaction_id()); + master().updates_server().Apply(dba.transaction_id()); + worker(1).updates_server().Apply(dba.transaction_id()); + worker(2).updates_server().Apply(dba.transaction_id()); dba.Commit(); }