Tidyup distributed stuff naming

Summary:
Remove "produce_" and "Produce" as prefix from all distributed stuff.
It's not removed in src/query/ stuff (operators).

Reviewers: dgleich

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1315
This commit is contained in:
florijan 2018-03-23 15:21:46 +01:00
parent 645e3bbc23
commit ac8c96ccc2
37 changed files with 745 additions and 808 deletions

View File

@ -19,14 +19,14 @@ set(memgraph_src_files
distributed/index_rpc_server.cpp distributed/index_rpc_server.cpp
distributed/plan_consumer.cpp distributed/plan_consumer.cpp
distributed/plan_dispatcher.cpp distributed/plan_dispatcher.cpp
distributed/remote_cache.cpp distributed/cache.cpp
distributed/remote_data_manager.cpp distributed/data_manager.cpp
distributed/remote_data_rpc_clients.cpp distributed/data_rpc_clients.cpp
distributed/remote_data_rpc_server.cpp distributed/data_rpc_server.cpp
distributed/remote_produce_rpc_server.cpp distributed/produce_rpc_server.cpp
distributed/remote_pull_rpc_clients.cpp distributed/pull_rpc_clients.cpp
distributed/remote_updates_rpc_clients.cpp distributed/updates_rpc_clients.cpp
distributed/remote_updates_rpc_server.cpp distributed/updates_rpc_server.cpp
durability/paths.cpp durability/paths.cpp
durability/recovery.cpp durability/recovery.cpp
durability/snapshooter.cpp durability/snapshooter.cpp

View File

@ -6,11 +6,11 @@
#include "database/state_delta.hpp" #include "database/state_delta.hpp"
#include "distributed/coordination_rpc_messages.hpp" #include "distributed/coordination_rpc_messages.hpp"
#include "distributed/data_rpc_messages.hpp"
#include "distributed/index_rpc_messages.hpp" #include "distributed/index_rpc_messages.hpp"
#include "distributed/plan_rpc_messages.hpp" #include "distributed/plan_rpc_messages.hpp"
#include "distributed/remote_data_rpc_messages.hpp" #include "distributed/pull_produce_rpc_messages.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp" #include "distributed/updates_rpc_messages.hpp"
#include "distributed/remote_updates_rpc_messages.hpp"
#include "stats/stats_rpc_messages.hpp" #include "stats/stats_rpc_messages.hpp"
#include "storage/concurrent_id_mapper_rpc_messages.hpp" #include "storage/concurrent_id_mapper_rpc_messages.hpp"
#include "transactions/engine_rpc_messages.hpp" #include "transactions/engine_rpc_messages.hpp"
@ -59,10 +59,10 @@ BOOST_CLASS_EXPORT(distributed::StopWorkerReq);
BOOST_CLASS_EXPORT(distributed::StopWorkerRes); BOOST_CLASS_EXPORT(distributed::StopWorkerRes);
// Distributed data exchange. // Distributed data exchange.
BOOST_CLASS_EXPORT(distributed::RemoteEdgeReq); BOOST_CLASS_EXPORT(distributed::EdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoteEdgeRes); BOOST_CLASS_EXPORT(distributed::EdgeRes);
BOOST_CLASS_EXPORT(distributed::RemoteVertexReq); BOOST_CLASS_EXPORT(distributed::VertexReq);
BOOST_CLASS_EXPORT(distributed::RemoteVertexRes); BOOST_CLASS_EXPORT(distributed::VertexRes);
BOOST_CLASS_EXPORT(distributed::TxGidPair); BOOST_CLASS_EXPORT(distributed::TxGidPair);
// Distributed plan exchange. // Distributed plan exchange.
@ -71,9 +71,9 @@ BOOST_CLASS_EXPORT(distributed::DispatchPlanRes);
BOOST_CLASS_EXPORT(distributed::RemovePlanReq); BOOST_CLASS_EXPORT(distributed::RemovePlanReq);
BOOST_CLASS_EXPORT(distributed::RemovePlanRes); BOOST_CLASS_EXPORT(distributed::RemovePlanRes);
// Remote pull. // Pull.
BOOST_CLASS_EXPORT(distributed::RemotePullReq); BOOST_CLASS_EXPORT(distributed::PullReq);
BOOST_CLASS_EXPORT(distributed::RemotePullRes); BOOST_CLASS_EXPORT(distributed::PullRes);
BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedReq); BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedReq);
BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedRes); BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedRes);
@ -88,30 +88,30 @@ BOOST_CLASS_EXPORT(stats::StatsRes);
BOOST_CLASS_EXPORT(stats::BatchStatsReq); BOOST_CLASS_EXPORT(stats::BatchStatsReq);
BOOST_CLASS_EXPORT(stats::BatchStatsRes); BOOST_CLASS_EXPORT(stats::BatchStatsRes);
// Remote updates. // Updates.
BOOST_CLASS_EXPORT(database::StateDelta); BOOST_CLASS_EXPORT(database::StateDelta);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq); BOOST_CLASS_EXPORT(distributed::UpdateReq);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes); BOOST_CLASS_EXPORT(distributed::UpdateRes);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyReq); BOOST_CLASS_EXPORT(distributed::UpdateApplyReq);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyRes); BOOST_CLASS_EXPORT(distributed::UpdateApplyRes);
// Remote creates. // Creates.
BOOST_CLASS_EXPORT(distributed::RemoteCreateResult); BOOST_CLASS_EXPORT(distributed::CreateResult);
BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexReq); BOOST_CLASS_EXPORT(distributed::CreateVertexReq);
BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexReqData); BOOST_CLASS_EXPORT(distributed::CreateVertexReqData);
BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexRes); BOOST_CLASS_EXPORT(distributed::CreateVertexRes);
BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeReqData); BOOST_CLASS_EXPORT(distributed::CreateEdgeReqData);
BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeReq); BOOST_CLASS_EXPORT(distributed::CreateEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeRes); BOOST_CLASS_EXPORT(distributed::CreateEdgeRes);
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReqData); BOOST_CLASS_EXPORT(distributed::AddInEdgeReqData);
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReq); BOOST_CLASS_EXPORT(distributed::AddInEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeRes); BOOST_CLASS_EXPORT(distributed::AddInEdgeRes);
// Remote removal. // Removes.
BOOST_CLASS_EXPORT(distributed::RemoteRemoveVertexReq); BOOST_CLASS_EXPORT(distributed::RemoveVertexReq);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveVertexRes); BOOST_CLASS_EXPORT(distributed::RemoveVertexRes);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveEdgeReq); BOOST_CLASS_EXPORT(distributed::RemoveEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveEdgeRes); BOOST_CLASS_EXPORT(distributed::RemoveEdgeRes);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveInEdgeData); BOOST_CLASS_EXPORT(distributed::RemoveInEdgeData);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveInEdgeReq); BOOST_CLASS_EXPORT(distributed::RemoveInEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveInEdgeRes); BOOST_CLASS_EXPORT(distributed::RemoveInEdgeRes);

View File

@ -4,17 +4,17 @@
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
#include "distributed/coordination_master.hpp" #include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp" #include "distributed/coordination_worker.hpp"
#include "distributed/data_manager.hpp"
#include "distributed/data_rpc_clients.hpp"
#include "distributed/data_rpc_server.hpp"
#include "distributed/index_rpc_server.hpp" #include "distributed/index_rpc_server.hpp"
#include "distributed/plan_consumer.hpp" #include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp" #include "distributed/plan_dispatcher.hpp"
#include "distributed/remote_data_manager.hpp" #include "distributed/produce_rpc_server.hpp"
#include "distributed/remote_data_rpc_clients.hpp" #include "distributed/pull_rpc_clients.hpp"
#include "distributed/remote_data_rpc_server.hpp"
#include "distributed/remote_produce_rpc_server.hpp"
#include "distributed/remote_pull_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_server.hpp"
#include "distributed/transactional_cache_cleaner.hpp" #include "distributed/transactional_cache_cleaner.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "durability/paths.hpp" #include "durability/paths.hpp"
#include "durability/recovery.hpp" #include "durability/recovery.hpp"
#include "durability/snapshooter.hpp" #include "durability/snapshooter.hpp"
@ -42,10 +42,10 @@ class PrivateBase : public GraphDb {
durability::WriteAheadLog &wal() override { return wal_; } durability::WriteAheadLog &wal() override { return wal_; }
int WorkerId() const override { return config_.worker_id; } int WorkerId() const override { return config_.worker_id; }
distributed::RemotePullRpcClients &remote_pull_clients() override { distributed::PullRpcClients &pull_clients() override {
LOG(FATAL) << "Remote pull clients only available in master."; LOG(FATAL) << "Remote pull clients only available in master.";
} }
distributed::RemoteProduceRpcServer &remote_produce_server() override { distributed::ProduceRpcServer &produce_server() override {
LOG(FATAL) << "Remote produce server only available in worker."; LOG(FATAL) << "Remote produce server only available in worker.";
} }
distributed::PlanConsumer &plan_consumer() override { distributed::PlanConsumer &plan_consumer() override {
@ -101,10 +101,10 @@ class SingleNode : public PrivateBase {
TypemapPack<SingleNodeConcurrentIdMapper> typemap_pack_; TypemapPack<SingleNodeConcurrentIdMapper> typemap_pack_;
database::SingleNodeCounters counters_; database::SingleNodeCounters counters_;
std::vector<int> GetWorkerIds() const override { return {0}; } std::vector<int> GetWorkerIds() const override { return {0}; }
distributed::RemoteDataRpcServer &remote_data_server() override { distributed::DataRpcServer &data_server() override {
LOG(FATAL) << "Remote data server not available in single-node."; LOG(FATAL) << "Remote data server not available in single-node.";
} }
distributed::RemoteDataRpcClients &remote_data_clients() override { distributed::DataRpcClients &data_clients() override {
LOG(FATAL) << "Remote data clients not available in single-node."; LOG(FATAL) << "Remote data clients not available in single-node.";
} }
distributed::PlanDispatcher &plan_dispatcher() override { distributed::PlanDispatcher &plan_dispatcher() override {
@ -113,42 +113,38 @@ class SingleNode : public PrivateBase {
distributed::PlanConsumer &plan_consumer() override { distributed::PlanConsumer &plan_consumer() override {
LOG(FATAL) << "Plan Consumer not available in single-node."; LOG(FATAL) << "Plan Consumer not available in single-node.";
} }
distributed::RemoteUpdatesRpcServer &remote_updates_server() override { distributed::UpdatesRpcServer &updates_server() override {
LOG(FATAL) << "Remote updates server not available in single-node."; LOG(FATAL) << "Remote updates server not available in single-node.";
} }
distributed::RemoteUpdatesRpcClients &remote_updates_clients() override { distributed::UpdatesRpcClients &updates_clients() override {
LOG(FATAL) << "Remote updates clients not available in single-node."; LOG(FATAL) << "Remote updates clients not available in single-node.";
} }
distributed::RemoteDataManager &remote_data_manager() override { distributed::DataManager &data_manager() override {
LOG(FATAL) << "Remote data manager not available in single-node."; LOG(FATAL) << "Remote data manager not available in single-node.";
} }
}; };
#define IMPL_DISTRIBUTED_GETTERS \ #define IMPL_DISTRIBUTED_GETTERS \
std::vector<int> GetWorkerIds() const override { \ std::vector<int> GetWorkerIds() const override { \
return coordination_.GetWorkerIds(); \ return coordination_.GetWorkerIds(); \
} \ } \
distributed::RemoteDataRpcServer &remote_data_server() override { \ distributed::DataRpcServer &data_server() override { return data_server_; } \
return remote_data_server_; \ distributed::DataRpcClients &data_clients() override { \
} \ return data_clients_; \
distributed::RemoteDataRpcClients &remote_data_clients() override { \ } \
return remote_data_clients_; \ distributed::UpdatesRpcServer &updates_server() override { \
} \ return updates_server_; \
distributed::RemoteUpdatesRpcServer &remote_updates_server() override { \ } \
return remote_updates_server_; \ distributed::UpdatesRpcClients &updates_clients() override { \
} \ return updates_clients_; \
distributed::RemoteUpdatesRpcClients &remote_updates_clients() override { \ } \
return remote_updates_clients_; \ distributed::DataManager &data_manager() override { return data_manager_; }
} \
distributed::RemoteDataManager &remote_data_manager() override { \
return remote_data_manager_; \
}
class Master : public PrivateBase { class Master : public PrivateBase {
public: public:
explicit Master(const Config &config) : PrivateBase(config) { explicit Master(const Config &config) : PrivateBase(config) {
cache_cleaner_.Register(remote_updates_server_); cache_cleaner_.Register(updates_server_);
cache_cleaner_.Register(remote_data_manager_); cache_cleaner_.Register(data_manager_);
} }
GraphDb::Type type() const override { GraphDb::Type type() const override {
@ -159,9 +155,7 @@ class Master : public PrivateBase {
distributed::PlanDispatcher &plan_dispatcher() override { distributed::PlanDispatcher &plan_dispatcher() override {
return plan_dispatcher_; return plan_dispatcher_;
} }
distributed::RemotePullRpcClients &remote_pull_clients() override { distributed::PullRpcClients &pull_clients() override { return pull_clients_; }
return remote_pull_clients_;
}
distributed::IndexRpcClients &index_rpc_clients() override { distributed::IndexRpcClients &index_rpc_clients() override {
return index_rpc_clients_; return index_rpc_clients_;
} }
@ -181,16 +175,14 @@ class Master : public PrivateBase {
distributed::RpcWorkerClients rpc_worker_clients_{coordination_}; distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
TypemapPack<MasterConcurrentIdMapper> typemap_pack_{server_}; TypemapPack<MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{server_}; database::MasterCounters counters_{server_};
distributed::RemoteDataRpcServer remote_data_server_{*this, server_}; distributed::DataRpcServer data_server_{*this, server_};
distributed::RemoteDataRpcClients remote_data_clients_{rpc_worker_clients_}; distributed::DataRpcClients data_clients_{rpc_worker_clients_};
distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_}; distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_};
distributed::RemotePullRpcClients remote_pull_clients_{rpc_worker_clients_}; distributed::PullRpcClients pull_clients_{rpc_worker_clients_};
distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_}; distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_}; distributed::UpdatesRpcServer updates_server_{*this, server_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{ distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
rpc_worker_clients_}; distributed::DataManager data_manager_{storage_, data_clients_};
distributed::RemoteDataManager remote_data_manager_{storage_,
remote_data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_}; distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_};
}; };
@ -199,9 +191,9 @@ class Worker : public PrivateBase {
explicit Worker(const Config &config) : PrivateBase(config) { explicit Worker(const Config &config) : PrivateBase(config) {
coordination_.RegisterWorker(config.worker_id); coordination_.RegisterWorker(config.worker_id);
cache_cleaner_.Register(tx_engine_); cache_cleaner_.Register(tx_engine_);
cache_cleaner_.Register(remote_produce_server_); cache_cleaner_.Register(produce_server_);
cache_cleaner_.Register(remote_updates_server_); cache_cleaner_.Register(updates_server_);
cache_cleaner_.Register(remote_data_manager_); cache_cleaner_.Register(data_manager_);
} }
GraphDb::Type type() const override { GraphDb::Type type() const override {
@ -210,8 +202,8 @@ class Worker : public PrivateBase {
IMPL_GETTERS IMPL_GETTERS
IMPL_DISTRIBUTED_GETTERS IMPL_DISTRIBUTED_GETTERS
distributed::PlanConsumer &plan_consumer() override { return plan_consumer_; } distributed::PlanConsumer &plan_consumer() override { return plan_consumer_; }
distributed::RemoteProduceRpcServer &remote_produce_server() override { distributed::ProduceRpcServer &produce_server() override {
return remote_produce_server_; return produce_server_;
} }
~Worker() { ~Worker() {
@ -231,17 +223,15 @@ class Worker : public PrivateBase {
TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{ TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{
rpc_worker_clients_.GetClientPool(0)}; rpc_worker_clients_.GetClientPool(0)};
database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)}; database::WorkerCounters counters_{rpc_worker_clients_.GetClientPool(0)};
distributed::RemoteDataRpcServer remote_data_server_{*this, server_}; distributed::DataRpcServer data_server_{*this, server_};
distributed::RemoteDataRpcClients remote_data_clients_{rpc_worker_clients_}; distributed::DataRpcClients data_clients_{rpc_worker_clients_};
distributed::PlanConsumer plan_consumer_{server_}; distributed::PlanConsumer plan_consumer_{server_};
distributed::RemoteProduceRpcServer remote_produce_server_{ distributed::ProduceRpcServer produce_server_{*this, tx_engine_, server_,
*this, tx_engine_, server_, plan_consumer_}; plan_consumer_};
distributed::IndexRpcServer index_rpc_server_{*this, server_}; distributed::IndexRpcServer index_rpc_server_{*this, server_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_}; distributed::UpdatesRpcServer updates_server_{*this, server_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{ distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
rpc_worker_clients_}; distributed::DataManager data_manager_{storage_, data_clients_};
distributed::RemoteDataManager remote_data_manager_{storage_,
remote_data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_}; distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_};
}; };
@ -311,11 +301,11 @@ int PublicBase::WorkerId() const { return impl_->WorkerId(); }
std::vector<int> PublicBase::GetWorkerIds() const { std::vector<int> PublicBase::GetWorkerIds() const {
return impl_->GetWorkerIds(); return impl_->GetWorkerIds();
} }
distributed::RemoteDataRpcServer &PublicBase::remote_data_server() { distributed::DataRpcServer &PublicBase::data_server() {
return impl_->remote_data_server(); return impl_->data_server();
} }
distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() { distributed::DataRpcClients &PublicBase::data_clients() {
return impl_->remote_data_clients(); return impl_->data_clients();
} }
distributed::PlanDispatcher &PublicBase::plan_dispatcher() { distributed::PlanDispatcher &PublicBase::plan_dispatcher() {
return impl_->plan_dispatcher(); return impl_->plan_dispatcher();
@ -326,20 +316,20 @@ distributed::IndexRpcClients &PublicBase::index_rpc_clients() {
distributed::PlanConsumer &PublicBase::plan_consumer() { distributed::PlanConsumer &PublicBase::plan_consumer() {
return impl_->plan_consumer(); return impl_->plan_consumer();
} }
distributed::RemotePullRpcClients &PublicBase::remote_pull_clients() { distributed::PullRpcClients &PublicBase::pull_clients() {
return impl_->remote_pull_clients(); return impl_->pull_clients();
} }
distributed::RemoteProduceRpcServer &PublicBase::remote_produce_server() { distributed::ProduceRpcServer &PublicBase::produce_server() {
return impl_->remote_produce_server(); return impl_->produce_server();
} }
distributed::RemoteUpdatesRpcServer &PublicBase::remote_updates_server() { distributed::UpdatesRpcServer &PublicBase::updates_server() {
return impl_->remote_updates_server(); return impl_->updates_server();
} }
distributed::RemoteUpdatesRpcClients &PublicBase::remote_updates_clients() { distributed::UpdatesRpcClients &PublicBase::updates_clients() {
return impl_->remote_updates_clients(); return impl_->updates_clients();
} }
distributed::RemoteDataManager &PublicBase::remote_data_manager() { distributed::DataManager &PublicBase::data_manager() {
return impl_->remote_data_manager(); return impl_->data_manager();
} }
void PublicBase::MakeSnapshot() { void PublicBase::MakeSnapshot() {

View File

@ -15,15 +15,15 @@
#include "utils/scheduler.hpp" #include "utils/scheduler.hpp"
namespace distributed { namespace distributed {
class RemoteDataRpcServer; class DataRpcServer;
class RemoteDataRpcClients; class DataRpcClients;
class PlanDispatcher; class PlanDispatcher;
class PlanConsumer; class PlanConsumer;
class RemotePullRpcClients; class PullRpcClients;
class RemoteProduceRpcServer; class ProduceRpcServer;
class RemoteUpdatesRpcServer; class UpdatesRpcServer;
class RemoteUpdatesRpcClients; class UpdatesRpcClients;
class RemoteDataManager; class DataManager;
class IndexRpcClients; class IndexRpcClients;
} }
@ -95,20 +95,20 @@ class GraphDb {
virtual std::vector<int> GetWorkerIds() const = 0; virtual std::vector<int> GetWorkerIds() const = 0;
// Supported only in distributed master and worker, not in single-node. // Supported only in distributed master and worker, not in single-node.
virtual distributed::RemoteDataRpcServer &remote_data_server() = 0; virtual distributed::DataRpcServer &data_server() = 0;
virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0; virtual distributed::DataRpcClients &data_clients() = 0;
virtual distributed::RemoteUpdatesRpcServer &remote_updates_server() = 0; virtual distributed::UpdatesRpcServer &updates_server() = 0;
virtual distributed::RemoteUpdatesRpcClients &remote_updates_clients() = 0; virtual distributed::UpdatesRpcClients &updates_clients() = 0;
virtual distributed::RemoteDataManager &remote_data_manager() = 0; virtual distributed::DataManager &data_manager() = 0;
// Supported only in distributed master. // Supported only in distributed master.
virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0; virtual distributed::PullRpcClients &pull_clients() = 0;
virtual distributed::PlanDispatcher &plan_dispatcher() = 0; virtual distributed::PlanDispatcher &plan_dispatcher() = 0;
virtual distributed::IndexRpcClients &index_rpc_clients() = 0; virtual distributed::IndexRpcClients &index_rpc_clients() = 0;
// Supported only in distributed worker. // Supported only in distributed worker.
// TODO remove once end2end testing is possible. // TODO remove once end2end testing is possible.
virtual distributed::RemoteProduceRpcServer &remote_produce_server() = 0; virtual distributed::ProduceRpcServer &produce_server() = 0;
virtual distributed::PlanConsumer &plan_consumer() = 0; virtual distributed::PlanConsumer &plan_consumer() = 0;
GraphDb(const GraphDb &) = delete; GraphDb(const GraphDb &) = delete;
@ -138,16 +138,16 @@ class PublicBase : public GraphDb {
void CollectGarbage() override; void CollectGarbage() override;
int WorkerId() const override; int WorkerId() const override;
std::vector<int> GetWorkerIds() const override; std::vector<int> GetWorkerIds() const override;
distributed::RemoteDataRpcServer &remote_data_server() override; distributed::DataRpcServer &data_server() override;
distributed::RemoteDataRpcClients &remote_data_clients() override; distributed::DataRpcClients &data_clients() override;
distributed::PlanDispatcher &plan_dispatcher() override; distributed::PlanDispatcher &plan_dispatcher() override;
distributed::IndexRpcClients &index_rpc_clients() override; distributed::IndexRpcClients &index_rpc_clients() override;
distributed::PlanConsumer &plan_consumer() override; distributed::PlanConsumer &plan_consumer() override;
distributed::RemotePullRpcClients &remote_pull_clients() override; distributed::PullRpcClients &pull_clients() override;
distributed::RemoteProduceRpcServer &remote_produce_server() override; distributed::ProduceRpcServer &produce_server() override;
distributed::RemoteUpdatesRpcServer &remote_updates_server() override; distributed::UpdatesRpcServer &updates_server() override;
distributed::RemoteUpdatesRpcClients &remote_updates_clients() override; distributed::UpdatesRpcClients &updates_clients() override;
distributed::RemoteDataManager &remote_data_manager() override; distributed::DataManager &data_manager() override;
bool is_accepting_transactions() const { return is_accepting_transactions_; } bool is_accepting_transactions() const { return is_accepting_transactions_; }

View File

@ -5,9 +5,9 @@
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp" #include "database/state_delta.hpp"
#include "distributed/remote_data_manager.hpp" #include "distributed/data_manager.hpp"
#include "distributed/remote_updates_rpc_clients.hpp"
#include "distributed/rpc_worker_clients.hpp" #include "distributed/rpc_worker_clients.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "storage/address_types.hpp" #include "storage/address_types.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"
#include "storage/edge_accessor.hpp" #include "storage/edge_accessor.hpp"
@ -85,14 +85,14 @@ VertexAccessor GraphDbAccessor::InsertVertexIntoRemote(
CHECK(worker_id != db().WorkerId()) CHECK(worker_id != db().WorkerId())
<< "Not allowed to call InsertVertexIntoRemote for local worker"; << "Not allowed to call InsertVertexIntoRemote for local worker";
gid::Gid gid = db().remote_updates_clients().RemoteCreateVertex( gid::Gid gid = db().updates_clients().CreateVertex(
worker_id, transaction_id(), labels, properties); worker_id, transaction_id(), labels, properties);
auto vertex = std::make_unique<Vertex>(); auto vertex = std::make_unique<Vertex>();
vertex->labels_ = labels; vertex->labels_ = labels;
for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second); for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second);
db().remote_data_manager() db().data_manager()
.Elements<Vertex>(transaction_id()) .Elements<Vertex>(transaction_id())
.emplace(gid, nullptr, std::move(vertex)); .emplace(gid, nullptr, std::move(vertex));
return VertexAccessor({gid, worker_id}, *this); return VertexAccessor({gid, worker_id}, *this);
@ -340,8 +340,8 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor,
if (!vertex_accessor.is_local()) { if (!vertex_accessor.is_local()) {
auto address = vertex_accessor.address(); auto address = vertex_accessor.address();
db().remote_updates_clients().RemoteRemoveVertex( db().updates_clients().RemoveVertex(address.worker_id(), transaction_id(),
address.worker_id(), transaction_id(), address.gid(), check_empty); address.gid(), check_empty);
// We can't know if we are going to be able to remove vertex until deferred // We can't know if we are going to be able to remove vertex until deferred
// updates on a remote worker are executed // updates on a remote worker are executed
return true; return true;
@ -411,15 +411,15 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
EdgeTypeName(edge_type))); EdgeTypeName(edge_type)));
} else { } else {
edge_address = db().remote_updates_clients().RemoteCreateEdge( edge_address = db().updates_clients().CreateEdge(transaction_id(), from, to,
transaction_id(), from, to, edge_type); edge_type);
from_updated = db().remote_data_manager() from_updated = db().data_manager()
.Elements<Vertex>(transaction_id()) .Elements<Vertex>(transaction_id())
.FindNew(from.gid()); .FindNew(from.gid());
// Create an Edge and insert it into the RemoteCache so we see it locally. // Create an Edge and insert it into the Cache so we see it locally.
db().remote_data_manager() db().data_manager()
.Elements<Edge>(transaction_id()) .Elements<Edge>(transaction_id())
.emplace( .emplace(
edge_address.gid(), nullptr, edge_address.gid(), nullptr,
@ -440,11 +440,11 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
// The RPC call for the `to` side is already handled if `from` is not local. // The RPC call for the `to` side is already handled if `from` is not local.
if (from.is_local() || if (from.is_local() ||
from.address().worker_id() != to.address().worker_id()) { from.address().worker_id() != to.address().worker_id()) {
db().remote_updates_clients().RemoteAddInEdge( db().updates_clients().AddInEdge(
transaction_id(), from, transaction_id(), from,
db().storage().GlobalizedAddress(edge_address), to, edge_type); db().storage().GlobalizedAddress(edge_address), to, edge_type);
} }
to_updated = db().remote_data_manager() to_updated = db().data_manager()
.Elements<Vertex>(transaction_id()) .Elements<Vertex>(transaction_id())
.FindNew(to.gid()); .FindNew(to.gid());
} }
@ -498,15 +498,15 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge,
CHECK(edge_addr.worker_id() == from_addr.worker_id()) CHECK(edge_addr.worker_id() == from_addr.worker_id())
<< "Edge and it's 'from' vertex not on the same worker"; << "Edge and it's 'from' vertex not on the same worker";
auto to_addr = db().storage().GlobalizedAddress(edge.to_addr()); auto to_addr = db().storage().GlobalizedAddress(edge.to_addr());
db().remote_updates_clients().RemoteRemoveEdge( db().updates_clients().RemoveEdge(transaction_id(), edge_addr.worker_id(),
transaction_id(), edge_addr.worker_id(), edge_addr.gid(), edge_addr.gid(), from_addr.gid(),
from_addr.gid(), to_addr); to_addr);
// Another RPC is necessary only if the first did not handle vertices on // Another RPC is necessary only if the first did not handle vertices on
// both sides. // both sides.
if (edge_addr.worker_id() != to_addr.worker_id()) { if (edge_addr.worker_id() != to_addr.worker_id()) {
db().remote_updates_clients().RemoteRemoveInEdge( db().updates_clients().RemoveInEdge(transaction_id(), to_addr.worker_id(),
transaction_id(), to_addr.worker_id(), to_addr.gid(), edge_addr); to_addr.gid(), edge_addr);
} }
} }
} }

View File

@ -9,7 +9,7 @@
#include "glog/logging.h" #include "glog/logging.h"
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
#include "distributed/remote_cache.hpp" #include "distributed/cache.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "storage/address_types.hpp" #include "storage/address_types.hpp"
#include "storage/edge_accessor.hpp" #include "storage/edge_accessor.hpp"

View File

@ -2,14 +2,14 @@
#include "glog/logging.h" #include "glog/logging.h"
#include "database/storage.hpp" #include "database/storage.hpp"
#include "distributed/remote_cache.hpp" #include "distributed/cache.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"
#include "storage/vertex.hpp" #include "storage/vertex.hpp"
namespace distributed { namespace distributed {
template <typename TRecord> template <typename TRecord>
TRecord *RemoteCache<TRecord>::FindNew(gid::Gid gid) { TRecord *Cache<TRecord>::FindNew(gid::Gid gid) {
std::lock_guard<std::mutex> guard{lock_}; std::lock_guard<std::mutex> guard{lock_};
auto found = cache_.find(gid); auto found = cache_.find(gid);
DCHECK(found != cache_.end()) DCHECK(found != cache_.end())
@ -22,10 +22,9 @@ TRecord *RemoteCache<TRecord>::FindNew(gid::Gid gid) {
} }
template <typename TRecord> template <typename TRecord>
void RemoteCache<TRecord>::FindSetOldNew(tx::transaction_id_t tx_id, void Cache<TRecord>::FindSetOldNew(tx::transaction_id_t tx_id, int worker_id,
int worker_id, gid::Gid gid, gid::Gid gid, TRecord *&old_record,
TRecord *&old_record, TRecord *&new_record) {
TRecord *&new_record) {
{ {
std::lock_guard<std::mutex> guard(lock_); std::lock_guard<std::mutex> guard(lock_);
auto found = cache_.find(gid); auto found = cache_.find(gid);
@ -36,8 +35,7 @@ void RemoteCache<TRecord>::FindSetOldNew(tx::transaction_id_t tx_id,
} }
} }
auto remote = auto remote = data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
remote_data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
LocalizeAddresses(*remote); LocalizeAddresses(*remote);
// This logic is a bit strange because we need to make sure that someone // This logic is a bit strange because we need to make sure that someone
@ -54,8 +52,8 @@ void RemoteCache<TRecord>::FindSetOldNew(tx::transaction_id_t tx_id,
} }
template <typename TRecord> template <typename TRecord>
void RemoteCache<TRecord>::emplace(gid::Gid gid, rec_uptr old_record, void Cache<TRecord>::emplace(gid::Gid gid, rec_uptr old_record,
rec_uptr new_record) { rec_uptr new_record) {
if (old_record) LocalizeAddresses(*old_record); if (old_record) LocalizeAddresses(*old_record);
if (new_record) LocalizeAddresses(*new_record); if (new_record) LocalizeAddresses(*new_record);
@ -71,13 +69,13 @@ void RemoteCache<TRecord>::emplace(gid::Gid gid, rec_uptr old_record,
} }
template <typename TRecord> template <typename TRecord>
void RemoteCache<TRecord>::ClearCache() { void Cache<TRecord>::ClearCache() {
std::lock_guard<std::mutex> guard{lock_}; std::lock_guard<std::mutex> guard{lock_};
cache_.clear(); cache_.clear();
} }
template <> template <>
void RemoteCache<Vertex>::LocalizeAddresses(Vertex &vertex) { void Cache<Vertex>::LocalizeAddresses(Vertex &vertex) {
auto localize_edges = [this](auto &edges) { auto localize_edges = [this](auto &edges) {
for (auto &element : edges) { for (auto &element : edges) {
element.vertex = storage_.LocalizedAddressIfPossible(element.vertex); element.vertex = storage_.LocalizedAddressIfPossible(element.vertex);
@ -90,12 +88,12 @@ void RemoteCache<Vertex>::LocalizeAddresses(Vertex &vertex) {
} }
template <> template <>
void RemoteCache<Edge>::LocalizeAddresses(Edge &edge) { void Cache<Edge>::LocalizeAddresses(Edge &edge) {
edge.from_ = storage_.LocalizedAddressIfPossible(edge.from_); edge.from_ = storage_.LocalizedAddressIfPossible(edge.from_);
edge.to_ = storage_.LocalizedAddressIfPossible(edge.to_); edge.to_ = storage_.LocalizedAddressIfPossible(edge.to_);
} }
template class RemoteCache<Vertex>; template class Cache<Vertex>;
template class RemoteCache<Edge>; template class Cache<Edge>;
} // namespace distributed } // namespace distributed

View File

@ -3,7 +3,7 @@
#include <mutex> #include <mutex>
#include <unordered_map> #include <unordered_map>
#include "distributed/remote_data_rpc_clients.hpp" #include "distributed/data_rpc_clients.hpp"
#include "storage/gid.hpp" #include "storage/gid.hpp"
namespace database { namespace database {
@ -16,19 +16,18 @@ namespace distributed {
* Used for caching Vertices and Edges that are stored on another worker in a * Used for caching Vertices and Edges that are stored on another worker in a
* distributed system. Maps global IDs to (old, new) Vertex/Edge pointer * distributed system. Maps global IDs to (old, new) Vertex/Edge pointer
* pairs. It is possible that either "old" or "new" are nullptrs, but at * pairs. It is possible that either "old" or "new" are nullptrs, but at
* least one must be not-null. The RemoteCache is the owner of TRecord * least one must be not-null. The Cache is the owner of TRecord
* objects it points to. * objects it points to.
* *
* @tparam TRecord - Edge or Vertex * @tparam TRecord - Edge or Vertex
*/ */
template <typename TRecord> template <typename TRecord>
class RemoteCache { class Cache {
using rec_uptr = std::unique_ptr<TRecord>; using rec_uptr = std::unique_ptr<TRecord>;
public: public:
RemoteCache(database::Storage &storage, Cache(database::Storage &storage, distributed::DataRpcClients &data_clients)
distributed::RemoteDataRpcClients &remote_data_clients) : storage_(storage), data_clients_(data_clients) {}
: storage_(storage), remote_data_clients_(remote_data_clients) {}
/// Returns the new data for the given ID. Creates it (as copy of old) if /// Returns the new data for the given ID. Creates it (as copy of old) if
/// necessary. /// necessary.
@ -51,7 +50,7 @@ class RemoteCache {
database::Storage &storage_; database::Storage &storage_;
std::mutex lock_; std::mutex lock_;
distributed::RemoteDataRpcClients &remote_data_clients_; distributed::DataRpcClients &data_clients_;
// TODO it'd be better if we had VertexData and EdgeData in here, as opposed // TODO it'd be better if we had VertexData and EdgeData in here, as opposed
// to Vertex and Edge. // to Vertex and Edge.
std::unordered_map<gid::Gid, std::pair<rec_uptr, rec_uptr>> cache_; std::unordered_map<gid::Gid, std::pair<rec_uptr, rec_uptr>> cache_;

View File

@ -0,0 +1,53 @@
#include "distributed/data_manager.hpp"
#include "database/storage.hpp"
namespace distributed {
template <typename TRecord>
Cache<TRecord> &DataManager::GetCache(CacheT<TRecord> &collection,
tx::transaction_id_t tx_id) {
auto access = collection.access();
auto found = access.find(tx_id);
if (found != access.end()) return found->second;
return access
.emplace(tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(storage_), std::ref(data_clients_)))
.first->second;
}
template <>
Cache<Vertex> &DataManager::Elements<Vertex>(tx::transaction_id_t tx_id) {
return GetCache(vertices_caches_, tx_id);
}
template <>
Cache<Edge> &DataManager::Elements<Edge>(tx::transaction_id_t tx_id) {
return GetCache(edges_caches_, tx_id);
}
DataManager::DataManager(database::Storage &storage,
distributed::DataRpcClients &data_clients)
: storage_(storage), data_clients_(data_clients) {}
void DataManager::ClearCacheForSingleTransaction(tx::transaction_id_t tx_id) {
Elements<Vertex>(tx_id).ClearCache();
Elements<Edge>(tx_id).ClearCache();
}
void DataManager::ClearTransactionalCache(tx::transaction_id_t oldest_active) {
auto vertex_access = vertices_caches_.access();
for (auto &kv : vertex_access) {
if (kv.first < oldest_active) {
vertex_access.remove(kv.first);
}
}
auto edge_access = edges_caches_.access();
for (auto &kv : edge_access) {
if (kv.first < oldest_active) {
edge_access.remove(kv.first);
}
}
}
} // namespace distributed

View File

@ -1,8 +1,8 @@
#pragma once #pragma once
#include "data_structures/concurrent/concurrent_map.hpp" #include "data_structures/concurrent/concurrent_map.hpp"
#include "distributed/remote_cache.hpp" #include "distributed/cache.hpp"
#include "distributed/remote_data_rpc_clients.hpp" #include "distributed/data_rpc_clients.hpp"
#include "transactions/type.hpp" #include "transactions/type.hpp"
class Vertex; class Vertex;
@ -15,22 +15,22 @@ class Storage;
namespace distributed { namespace distributed {
/// Handles remote data caches for edges and vertices, per transaction. /// Handles remote data caches for edges and vertices, per transaction.
class RemoteDataManager { class DataManager {
template <typename TRecord> template <typename TRecord>
using CacheT = ConcurrentMap<tx::transaction_id_t, RemoteCache<TRecord>>; using CacheT = ConcurrentMap<tx::transaction_id_t, Cache<TRecord>>;
// Helper, gets or inserts a data cache for the given transaction. // Helper, gets or inserts a data cache for the given transaction.
template <typename TRecord> template <typename TRecord>
RemoteCache<TRecord> &GetCache(CacheT<TRecord> &collection, Cache<TRecord> &GetCache(CacheT<TRecord> &collection,
tx::transaction_id_t tx_id); tx::transaction_id_t tx_id);
public: public:
RemoteDataManager(database::Storage &storage, DataManager(database::Storage &storage,
distributed::RemoteDataRpcClients &remote_data_clients); distributed::DataRpcClients &data_clients);
/// Gets or creates the remote vertex/edge cache for the given transaction. /// Gets or creates the remote vertex/edge cache for the given transaction.
template <typename TRecord> template <typename TRecord>
RemoteCache<TRecord> &Elements(tx::transaction_id_t tx_id); Cache<TRecord> &Elements(tx::transaction_id_t tx_id);
/// Removes all the caches for a single transaction. /// Removes all the caches for a single transaction.
void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id); void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id);
@ -41,7 +41,7 @@ class RemoteDataManager {
private: private:
database::Storage &storage_; database::Storage &storage_;
RemoteDataRpcClients &remote_data_clients_; DataRpcClients &data_clients_;
CacheT<Vertex> vertices_caches_; CacheT<Vertex> vertices_caches_;
CacheT<Edge> edges_caches_; CacheT<Edge> edges_caches_;
}; };

View File

@ -0,0 +1,27 @@
#include "distributed/data_rpc_clients.hpp"
#include "distributed/data_rpc_messages.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
namespace distributed {
template <>
std::unique_ptr<Edge> DataRpcClients::RemoteElement(int worker_id,
tx::transaction_id_t tx_id,
gid::Gid gid) {
auto response =
clients_.GetClientPool(worker_id).Call<EdgeRpc>(TxGidPair{tx_id, gid});
CHECK(response) << "EdgeRpc failed";
return std::move(response->name_output_);
}
template <>
std::unique_ptr<Vertex> DataRpcClients::RemoteElement(
int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) {
auto response =
clients_.GetClientPool(worker_id).Call<VertexRpc>(TxGidPair{tx_id, gid});
CHECK(response) << "VertexRpc failed";
return std::move(response->name_output_);
}
} // namespace distributed

View File

@ -10,9 +10,9 @@
namespace distributed { namespace distributed {
/// Provides access to other worker's data. /// Provides access to other worker's data.
class RemoteDataRpcClients { class DataRpcClients {
public: public:
RemoteDataRpcClients(RpcWorkerClients &clients) : clients_(clients) {} DataRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
/// Returns a remote worker's record (vertex/edge) data for the given params. /// Returns a remote worker's record (vertex/edge) data for the given params.
/// That worker must own the vertex/edge for the given id, and that vertex /// That worker must own the vertex/edge for the given id, and that vertex
/// must be visible in given transaction. /// must be visible in given transaction.

View File

@ -27,10 +27,10 @@ struct TxGidPair {
}; };
#define MAKE_RESPONSE(type, name) \ #define MAKE_RESPONSE(type, name) \
class Remote##type##Res : public communication::rpc::Message { \ class type##Res : public communication::rpc::Message { \
public: \ public: \
Remote##type##Res() {} \ type##Res() {} \
Remote##type##Res(const type *name, int worker_id) \ type##Res(const type *name, int worker_id) \
: name_input_(name), worker_id_(worker_id) {} \ : name_input_(name), worker_id_(worker_id) {} \
\ \
template <class TArchive> \ template <class TArchive> \
@ -59,12 +59,10 @@ MAKE_RESPONSE(Edge, edge)
#undef MAKE_RESPONSE #undef MAKE_RESPONSE
RPC_SINGLE_MEMBER_MESSAGE(RemoteVertexReq, TxGidPair); RPC_SINGLE_MEMBER_MESSAGE(VertexReq, TxGidPair);
RPC_SINGLE_MEMBER_MESSAGE(RemoteEdgeReq, TxGidPair); RPC_SINGLE_MEMBER_MESSAGE(EdgeReq, TxGidPair);
using RemoteVertexRpc = using VertexRpc = communication::rpc::RequestResponse<VertexReq, VertexRes>;
communication::rpc::RequestResponse<RemoteVertexReq, RemoteVertexRes>; using EdgeRpc = communication::rpc::RequestResponse<EdgeReq, EdgeRes>;
using RemoteEdgeRpc =
communication::rpc::RequestResponse<RemoteEdgeReq, RemoteEdgeRes>;
} // namespace distributed } // namespace distributed

View File

@ -0,0 +1,29 @@
#include <memory>
#include "data_rpc_server.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/data_rpc_messages.hpp"
namespace distributed {
DataRpcServer::DataRpcServer(database::GraphDb &db,
communication::rpc::Server &server)
: db_(db), rpc_server_(server) {
rpc_server_.Register<VertexRpc>(
[this](const VertexReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id);
auto vertex = dba.FindVertex(req.member.gid, false);
CHECK(vertex.GetOld())
<< "Old record must exist when sending vertex by RPC";
return std::make_unique<VertexRes>(vertex.GetOld(), db_.WorkerId());
});
rpc_server_.Register<EdgeRpc>([this](const EdgeReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id);
auto edge = dba.FindEdge(req.member.gid, false);
CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC";
return std::make_unique<EdgeRes>(edge.GetOld(), db_.WorkerId());
});
}
} // namespace distributed

View File

@ -6,10 +6,9 @@
namespace distributed { namespace distributed {
/// Serves this worker's data to others. /// Serves this worker's data to others.
class RemoteDataRpcServer { class DataRpcServer {
public: public:
RemoteDataRpcServer(database::GraphDb &db, DataRpcServer(database::GraphDb &db, communication::rpc::Server &server);
communication::rpc::Server &server);
private: private:
database::GraphDb &db_; database::GraphDb &db_;

View File

@ -1,13 +1,13 @@
#include "distributed/remote_produce_rpc_server.hpp" #include "distributed/produce_rpc_server.hpp"
#include "distributed/remote_data_manager.hpp" #include "distributed/data_manager.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp" #include "distributed/pull_produce_rpc_messages.hpp"
#include "query/common.hpp" #include "query/common.hpp"
#include "query/exceptions.hpp" #include "query/exceptions.hpp"
#include "transactions/engine_worker.hpp" #include "transactions/engine_worker.hpp"
namespace distributed { namespace distributed {
RemoteProduceRpcServer::OngoingProduce::OngoingProduce( ProduceRpcServer::OngoingProduce::OngoingProduce(
database::GraphDb &db, tx::transaction_id_t tx_id, database::GraphDb &db, tx::transaction_id_t tx_id,
std::shared_ptr<query::plan::LogicalOperator> op, std::shared_ptr<query::plan::LogicalOperator> op,
query::SymbolTable symbol_table, Parameters parameters, query::SymbolTable symbol_table, Parameters parameters,
@ -21,8 +21,8 @@ RemoteProduceRpcServer::OngoingProduce::OngoingProduce(
context_.parameters_ = std::move(parameters); context_.parameters_ = std::move(parameters);
} }
std::pair<std::vector<query::TypedValue>, RemotePullState> std::pair<std::vector<query::TypedValue>, PullState>
RemoteProduceRpcServer::OngoingProduce::Pull() { ProduceRpcServer::OngoingProduce::Pull() {
if (!accumulation_.empty()) { if (!accumulation_.empty()) {
auto results = std::move(accumulation_.back()); auto results = std::move(accumulation_.back());
accumulation_.pop_back(); accumulation_.pop_back();
@ -30,35 +30,34 @@ RemoteProduceRpcServer::OngoingProduce::Pull() {
try { try {
query::ReconstructTypedValue(element); query::ReconstructTypedValue(element);
} catch (query::ReconstructionException &) { } catch (query::ReconstructionException &) {
cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; cursor_state_ = PullState::RECONSTRUCTION_ERROR;
return std::make_pair(std::move(results), cursor_state_); return std::make_pair(std::move(results), cursor_state_);
} }
} }
return std::make_pair(std::move(results), return std::make_pair(std::move(results), PullState::CURSOR_IN_PROGRESS);
RemotePullState::CURSOR_IN_PROGRESS);
} }
return PullOneFromCursor(); return PullOneFromCursor();
} }
RemotePullState RemoteProduceRpcServer::OngoingProduce::Accumulate() { PullState ProduceRpcServer::OngoingProduce::Accumulate() {
while (true) { while (true) {
auto result = PullOneFromCursor(); auto result = PullOneFromCursor();
if (result.second != RemotePullState::CURSOR_IN_PROGRESS) if (result.second != PullState::CURSOR_IN_PROGRESS)
return result.second; return result.second;
else else
accumulation_.emplace_back(std::move(result.first)); accumulation_.emplace_back(std::move(result.first));
} }
} }
std::pair<std::vector<query::TypedValue>, RemotePullState> std::pair<std::vector<query::TypedValue>, PullState>
RemoteProduceRpcServer::OngoingProduce::PullOneFromCursor() { ProduceRpcServer::OngoingProduce::PullOneFromCursor() {
std::vector<query::TypedValue> results; std::vector<query::TypedValue> results;
// Check if we already exhausted this cursor (or it entered an error // Check if we already exhausted this cursor (or it entered an error
// state). This happens when we accumulate before normal pull. // state). This happens when we accumulate before normal pull.
if (cursor_state_ != RemotePullState::CURSOR_IN_PROGRESS) { if (cursor_state_ != PullState::CURSOR_IN_PROGRESS) {
return std::make_pair(results, cursor_state_); return std::make_pair(results, cursor_state_);
} }
@ -69,48 +68,47 @@ RemoteProduceRpcServer::OngoingProduce::PullOneFromCursor() {
results.emplace_back(std::move(frame_[symbol])); results.emplace_back(std::move(frame_[symbol]));
} }
} else { } else {
cursor_state_ = RemotePullState::CURSOR_EXHAUSTED; cursor_state_ = PullState::CURSOR_EXHAUSTED;
} }
} catch (const mvcc::SerializationError &) { } catch (const mvcc::SerializationError &) {
cursor_state_ = RemotePullState::SERIALIZATION_ERROR; cursor_state_ = PullState::SERIALIZATION_ERROR;
} catch (const LockTimeoutException &) { } catch (const LockTimeoutException &) {
cursor_state_ = RemotePullState::LOCK_TIMEOUT_ERROR; cursor_state_ = PullState::LOCK_TIMEOUT_ERROR;
} catch (const RecordDeletedError &) { } catch (const RecordDeletedError &) {
cursor_state_ = RemotePullState::UPDATE_DELETED_ERROR; cursor_state_ = PullState::UPDATE_DELETED_ERROR;
} catch (const query::ReconstructionException &) { } catch (const query::ReconstructionException &) {
cursor_state_ = RemotePullState::RECONSTRUCTION_ERROR; cursor_state_ = PullState::RECONSTRUCTION_ERROR;
} catch (const query::RemoveAttachedVertexException &) { } catch (const query::RemoveAttachedVertexException &) {
cursor_state_ = RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR; cursor_state_ = PullState::UNABLE_TO_DELETE_VERTEX_ERROR;
} catch (const query::QueryRuntimeException &) { } catch (const query::QueryRuntimeException &) {
cursor_state_ = RemotePullState::QUERY_ERROR; cursor_state_ = PullState::QUERY_ERROR;
} catch (const query::HintedAbortError &) { } catch (const query::HintedAbortError &) {
cursor_state_ = RemotePullState::HINTED_ABORT_ERROR; cursor_state_ = PullState::HINTED_ABORT_ERROR;
} }
return std::make_pair(std::move(results), cursor_state_); return std::make_pair(std::move(results), cursor_state_);
} }
RemoteProduceRpcServer::RemoteProduceRpcServer( ProduceRpcServer::ProduceRpcServer(
database::GraphDb &db, tx::Engine &tx_engine, database::GraphDb &db, tx::Engine &tx_engine,
communication::rpc::Server &server, communication::rpc::Server &server,
const distributed::PlanConsumer &plan_consumer) const distributed::PlanConsumer &plan_consumer)
: db_(db), : db_(db),
remote_produce_rpc_server_(server), produce_rpc_server_(server),
plan_consumer_(plan_consumer), plan_consumer_(plan_consumer),
tx_engine_(tx_engine) { tx_engine_(tx_engine) {
remote_produce_rpc_server_.Register<RemotePullRpc>( produce_rpc_server_.Register<PullRpc>([this](const PullReq &req) {
[this](const RemotePullReq &req) { return std::make_unique<PullRes>(Pull(req));
return std::make_unique<RemotePullRes>(RemotePull(req)); });
});
remote_produce_rpc_server_.Register<TransactionCommandAdvancedRpc>( produce_rpc_server_.Register<TransactionCommandAdvancedRpc>(
[this](const TransactionCommandAdvancedReq &req) { [this](const TransactionCommandAdvancedReq &req) {
tx_engine_.UpdateCommand(req.member); tx_engine_.UpdateCommand(req.member);
db_.remote_data_manager().ClearCacheForSingleTransaction(req.member); db_.data_manager().ClearCacheForSingleTransaction(req.member);
return std::make_unique<TransactionCommandAdvancedRes>(); return std::make_unique<TransactionCommandAdvancedRes>();
}); });
} }
void RemoteProduceRpcServer::ClearTransactionalCache( void ProduceRpcServer::ClearTransactionalCache(
tx::transaction_id_t oldest_active) { tx::transaction_id_t oldest_active) {
auto access = ongoing_produces_.access(); auto access = ongoing_produces_.access();
for (auto &kv : access) { for (auto &kv : access) {
@ -120,8 +118,8 @@ void RemoteProduceRpcServer::ClearTransactionalCache(
} }
} }
RemoteProduceRpcServer::OngoingProduce & ProduceRpcServer::OngoingProduce &ProduceRpcServer::GetOngoingProduce(
RemoteProduceRpcServer::GetOngoingProduce(const RemotePullReq &req) { const PullReq &req) {
auto access = ongoing_produces_.access(); auto access = ongoing_produces_.access();
auto key_pair = std::make_pair(req.tx_id, req.plan_id); auto key_pair = std::make_pair(req.tx_id, req.plan_id);
auto found = access.find(key_pair); auto found = access.find(key_pair);
@ -142,17 +140,16 @@ RemoteProduceRpcServer::GetOngoingProduce(const RemotePullReq &req) {
.first->second; .first->second;
} }
RemotePullResData RemoteProduceRpcServer::RemotePull(const RemotePullReq &req) { PullResData ProduceRpcServer::Pull(const PullReq &req) {
auto &ongoing_produce = GetOngoingProduce(req); auto &ongoing_produce = GetOngoingProduce(req);
RemotePullResData result{db_.WorkerId(), req.send_old, req.send_new}; PullResData result{db_.WorkerId(), req.send_old, req.send_new};
result.state_and_frames.pull_state = RemotePullState::CURSOR_IN_PROGRESS; result.state_and_frames.pull_state = PullState::CURSOR_IN_PROGRESS;
if (req.accumulate) { if (req.accumulate) {
result.state_and_frames.pull_state = ongoing_produce.Accumulate(); result.state_and_frames.pull_state = ongoing_produce.Accumulate();
// If an error ocurred, we need to return that error. // If an error ocurred, we need to return that error.
if (result.state_and_frames.pull_state != if (result.state_and_frames.pull_state != PullState::CURSOR_EXHAUSTED) {
RemotePullState::CURSOR_EXHAUSTED) {
return result; return result;
} }
} }
@ -160,7 +157,7 @@ RemotePullResData RemoteProduceRpcServer::RemotePull(const RemotePullReq &req) {
for (int i = 0; i < req.batch_size; ++i) { for (int i = 0; i < req.batch_size; ++i) {
auto pull_result = ongoing_produce.Pull(); auto pull_result = ongoing_produce.Pull();
result.state_and_frames.pull_state = pull_result.second; result.state_and_frames.pull_state = pull_result.second;
if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS) break; if (pull_result.second != PullState::CURSOR_IN_PROGRESS) break;
result.state_and_frames.frames.emplace_back(std::move(pull_result.first)); result.state_and_frames.frames.emplace_back(std::move(pull_result.first));
} }

View File

@ -24,7 +24,7 @@ namespace distributed {
/// master. Assumes that (tx_id, plan_id) uniquely identifies an execution, and /// master. Assumes that (tx_id, plan_id) uniquely identifies an execution, and
/// that there will never be parallel requests for the same execution thus /// that there will never be parallel requests for the same execution thus
/// identified. /// identified.
class RemoteProduceRpcServer { class ProduceRpcServer {
/// Encapsulates a Cursor execution in progress. Can be used for pulling a /// Encapsulates a Cursor execution in progress. Can be used for pulling a
/// single result from the execution, or pulling all and accumulating the /// single result from the execution, or pulling all and accumulating the
/// results. Accumulations are used for synchronizing updates in distributed /// results. Accumulations are used for synchronizing updates in distributed
@ -39,11 +39,11 @@ class RemoteProduceRpcServer {
/// Returns a vector of typed values (one for each `pull_symbol`), and an /// Returns a vector of typed values (one for each `pull_symbol`), and an
/// indication of the pull result. The result data is valid only if the /// indication of the pull result. The result data is valid only if the
/// returned state is CURSOR_IN_PROGRESS. /// returned state is CURSOR_IN_PROGRESS.
std::pair<std::vector<query::TypedValue>, RemotePullState> Pull(); std::pair<std::vector<query::TypedValue>, PullState> Pull();
/// Accumulates all the frames pulled from the cursor and returns /// Accumulates all the frames pulled from the cursor and returns
/// CURSOR_EXHAUSTED. If an error occurs, an appropriate value is returned. /// CURSOR_EXHAUSTED. If an error occurs, an appropriate value is returned.
RemotePullState Accumulate(); PullState Accumulate();
private: private:
database::GraphDbAccessor dba_; database::GraphDbAccessor dba_;
@ -51,18 +51,17 @@ class RemoteProduceRpcServer {
query::Context context_; query::Context context_;
std::vector<query::Symbol> pull_symbols_; std::vector<query::Symbol> pull_symbols_;
query::Frame frame_; query::Frame frame_;
RemotePullState cursor_state_{RemotePullState::CURSOR_IN_PROGRESS}; PullState cursor_state_{PullState::CURSOR_IN_PROGRESS};
std::vector<std::vector<query::TypedValue>> accumulation_; std::vector<std::vector<query::TypedValue>> accumulation_;
/// Pulls and returns a single result from the cursor. /// Pulls and returns a single result from the cursor.
std::pair<std::vector<query::TypedValue>, RemotePullState> std::pair<std::vector<query::TypedValue>, PullState> PullOneFromCursor();
PullOneFromCursor();
}; };
public: public:
RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &tx_engine, ProduceRpcServer(database::GraphDb &db, tx::Engine &tx_engine,
communication::rpc::Server &server, communication::rpc::Server &server,
const distributed::PlanConsumer &plan_consumer); const distributed::PlanConsumer &plan_consumer);
/// Clears the cache of local transactions that have expired. The signature of /// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::TransactionalCacheCleaner`. /// this method is dictated by `distributed::TransactionalCacheCleaner`.
@ -70,7 +69,7 @@ class RemoteProduceRpcServer {
private: private:
database::GraphDb &db_; database::GraphDb &db_;
communication::rpc::Server &remote_produce_rpc_server_; communication::rpc::Server &produce_rpc_server_;
const distributed::PlanConsumer &plan_consumer_; const distributed::PlanConsumer &plan_consumer_;
ConcurrentMap<std::pair<tx::transaction_id_t, int64_t>, OngoingProduce> ConcurrentMap<std::pair<tx::transaction_id_t, int64_t>, OngoingProduce>
ongoing_produces_; ongoing_produces_;
@ -78,10 +77,10 @@ class RemoteProduceRpcServer {
/// Gets an ongoing produce for the given pull request. Creates a new one if /// Gets an ongoing produce for the given pull request. Creates a new one if
/// there is none currently existing. /// there is none currently existing.
OngoingProduce &GetOngoingProduce(const RemotePullReq &req); OngoingProduce &GetOngoingProduce(const PullReq &req);
/// Performs a single remote pull for the given request. /// Performs a single remote pull for the given request.
RemotePullResData RemotePull(const RemotePullReq &req); PullResData Pull(const PullReq &req);
}; };
} // namespace distributed } // namespace distributed

View File

@ -23,7 +23,7 @@ constexpr int kDefaultBatchSize = 20;
/// Returnd along with a batch of results in the remote-pull RPC. Indicates the /// Returnd along with a batch of results in the remote-pull RPC. Indicates the
/// state of execution on the worker. /// state of execution on the worker.
enum class RemotePullState { enum class PullState {
CURSOR_EXHAUSTED, CURSOR_EXHAUSTED,
CURSOR_IN_PROGRESS, CURSOR_IN_PROGRESS,
SERIALIZATION_ERROR, SERIALIZATION_ERROR,
@ -35,12 +35,11 @@ enum class RemotePullState {
QUERY_ERROR QUERY_ERROR
}; };
struct RemotePullReq : public communication::rpc::Message { struct PullReq : public communication::rpc::Message {
RemotePullReq() {} PullReq() {}
RemotePullReq(tx::transaction_id_t tx_id, tx::Snapshot tx_snapshot, PullReq(tx::transaction_id_t tx_id, tx::Snapshot tx_snapshot, int64_t plan_id,
int64_t plan_id, const Parameters &params, const Parameters &params, std::vector<query::Symbol> symbols,
std::vector<query::Symbol> symbols, bool accumulate, bool accumulate, int batch_size, bool send_old, bool send_new)
int batch_size, bool send_old, bool send_new)
: tx_id(tx_id), : tx_id(tx_id),
tx_snapshot(tx_snapshot), tx_snapshot(tx_snapshot),
plan_id(plan_id), plan_id(plan_id),
@ -109,10 +108,10 @@ struct RemotePullReq : public communication::rpc::Message {
BOOST_SERIALIZATION_SPLIT_MEMBER() BOOST_SERIALIZATION_SPLIT_MEMBER()
}; };
/// The data returned to the end consumer (the RemotePull operator). Contains /// The data returned to the end consumer (the Pull operator). Contains
/// only the relevant parts of the response, ready for use. /// only the relevant parts of the response, ready for use.
struct RemotePullData { struct PullData {
RemotePullState pull_state; PullState pull_state;
std::vector<std::vector<query::TypedValue>> frames; std::vector<std::vector<query::TypedValue>> frames;
}; };
@ -121,17 +120,17 @@ struct RemotePullData {
/// (possibly encapsulated in lists/maps) to their proper values. This requires /// (possibly encapsulated in lists/maps) to their proper values. This requires
/// a GraphDbAccessor and therefore can't be done as part of deserialization. /// a GraphDbAccessor and therefore can't be done as part of deserialization.
/// ///
/// TODO - make it possible to inject a &GraphDbAcessor from the RemotePull /// TODO - make it possible to inject a &GraphDbAcessor from the Pull
/// layer /// layer
/// all the way into RPC data deserialization to remove the requirement for /// all the way into RPC data deserialization to remove the requirement for
/// post-processing. The current approach of holding references to parts of the /// post-processing. The current approach of holding references to parts of the
/// frame (potentially embedded in lists/maps) is too error-prone. /// frame (potentially embedded in lists/maps) is too error-prone.
struct RemotePullResData { struct PullResData {
private: private:
// Temp cache for deserialized vertices and edges. These objects are created // Temp cache for deserialized vertices and edges. These objects are created
// during deserialization. They are used immediatelly after during // during deserialization. They are used immediatelly after during
// post-processing. The vertex/edge data ownership gets transfered to the // post-processing. The vertex/edge data ownership gets transfered to the
// RemoteCache, and the `element_in_frame` reference is used to set the // Cache, and the `element_in_frame` reference is used to set the
// appropriate accessor to the appropriate value. Not used on side that // appropriate accessor to the appropriate value. Not used on side that
// generates the response. // generates the response.
template <typename TRecord> template <typename TRecord>
@ -164,16 +163,16 @@ struct RemotePullResData {
}; };
public: public:
RemotePullResData() {} // Default constructor required for serialization. PullResData() {} // Default constructor required for serialization.
RemotePullResData(int worker_id, bool send_old, bool send_new) PullResData(int worker_id, bool send_old, bool send_new)
: worker_id(worker_id), send_old(send_old), send_new(send_new) {} : worker_id(worker_id), send_old(send_old), send_new(send_new) {}
RemotePullResData(const RemotePullResData &) = delete; PullResData(const PullResData &) = delete;
RemotePullResData &operator=(const RemotePullResData &) = delete; PullResData &operator=(const PullResData &) = delete;
RemotePullResData(RemotePullResData &&) = default; PullResData(PullResData &&) = default;
RemotePullResData &operator=(RemotePullResData &&) = default; PullResData &operator=(PullResData &&) = default;
RemotePullData state_and_frames; PullData state_and_frames;
// Id of the worker on which the response is created, used for serializing // Id of the worker on which the response is created, used for serializing
// vertices (converting local to global addresses). // vertices (converting local to global addresses).
int worker_id; int worker_id;
@ -182,7 +181,7 @@ struct RemotePullResData {
bool send_new; bool send_new;
// Temporary caches used between deserialization and post-processing // Temporary caches used between deserialization and post-processing
// (transfering the ownership of this data to a RemoteCache). // (transfering the ownership of this data to a Cache).
std::vector<GraphElementData<Vertex>> vertices; std::vector<GraphElementData<Vertex>> vertices;
std::vector<GraphElementData<Edge>> edges; std::vector<GraphElementData<Edge>> edges;
std::vector<PathData> paths; std::vector<PathData> paths;
@ -306,12 +305,12 @@ struct RemotePullResData {
} }
}; };
class RemotePullRes : public communication::rpc::Message { class PullRes : public communication::rpc::Message {
public: public:
RemotePullRes() {} PullRes() {}
RemotePullRes(RemotePullResData data) : data(std::move(data)) {} PullRes(PullResData data) : data(std::move(data)) {}
RemotePullResData data; PullResData data;
private: private:
friend class boost::serialization::access; friend class boost::serialization::access;
@ -362,11 +361,10 @@ class RemotePullRes : public communication::rpc::Message {
BOOST_SERIALIZATION_SPLIT_MEMBER() BOOST_SERIALIZATION_SPLIT_MEMBER()
}; };
using RemotePullRpc = using PullRpc = communication::rpc::RequestResponse<PullReq, PullRes>;
communication::rpc::RequestResponse<RemotePullReq, RemotePullRes>;
// TODO make a separate RPC for the continuation of an existing pull, as an // TODO make a separate RPC for the continuation of an existing pull, as an
// optimization not to have to send the full RemotePullReqData pack every // optimization not to have to send the full PullReqData pack every
// time. // time.
RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::transaction_id_t); RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::transaction_id_t);

View File

@ -1,26 +1,26 @@
#include <functional> #include <functional>
#include "distributed/remote_data_manager.hpp" #include "distributed/data_manager.hpp"
#include "distributed/remote_pull_rpc_clients.hpp" #include "distributed/pull_rpc_clients.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"
#include "storage/vertex.hpp" #include "storage/vertex.hpp"
namespace distributed { namespace distributed {
utils::Future<RemotePullData> RemotePullRpcClients::RemotePull( utils::Future<PullData> PullRpcClients::Pull(
database::GraphDbAccessor &dba, int worker_id, int64_t plan_id, database::GraphDbAccessor &dba, int worker_id, int64_t plan_id,
const Parameters &params, const std::vector<query::Symbol> &symbols, const Parameters &params, const std::vector<query::Symbol> &symbols,
bool accumulate, int batch_size) { bool accumulate, int batch_size) {
return clients_.ExecuteOnWorker<RemotePullData>( return clients_.ExecuteOnWorker<PullData>(
worker_id, [&dba, plan_id, params, symbols, accumulate, worker_id, [&dba, plan_id, params, symbols, accumulate,
batch_size](ClientPool &client_pool) { batch_size](ClientPool &client_pool) {
auto result = client_pool.Call<RemotePullRpc>( auto result = client_pool.Call<PullRpc>(
dba.transaction_id(), dba.transaction().snapshot(), plan_id, params, dba.transaction_id(), dba.transaction().snapshot(), plan_id, params,
symbols, accumulate, batch_size, true, true); symbols, accumulate, batch_size, true, true);
auto handle_vertex = [&dba](auto &v) { auto handle_vertex = [&dba](auto &v) {
dba.db() dba.db()
.remote_data_manager() .data_manager()
.Elements<Vertex>(dba.transaction_id()) .Elements<Vertex>(dba.transaction_id())
.emplace(v.global_address.gid(), std::move(v.old_record), .emplace(v.global_address.gid(), std::move(v.old_record),
std::move(v.new_record)); std::move(v.new_record));
@ -31,7 +31,7 @@ utils::Future<RemotePullData> RemotePullRpcClients::RemotePull(
}; };
auto handle_edge = [&dba](auto &e) { auto handle_edge = [&dba](auto &e) {
dba.db() dba.db()
.remote_data_manager() .data_manager()
.Elements<Edge>(dba.transaction_id()) .Elements<Edge>(dba.transaction_id())
.emplace(e.global_address.gid(), std::move(e.old_record), .emplace(e.global_address.gid(), std::move(e.old_record),
std::move(e.new_record)); std::move(e.new_record));
@ -61,7 +61,7 @@ utils::Future<RemotePullData> RemotePullRpcClients::RemotePull(
} }
std::vector<utils::Future<void>> std::vector<utils::Future<void>>
RemotePullRpcClients::NotifyAllTransactionCommandAdvanced( PullRpcClients::NotifyAllTransactionCommandAdvanced(
tx::transaction_id_t tx_id) { tx::transaction_id_t tx_id) {
return clients_.ExecuteOnWorkers<void>(0, [tx_id](auto &client) { return clients_.ExecuteOnWorkers<void>(0, [tx_id](auto &client) {
auto res = client.template Call<TransactionCommandAdvancedRpc>(tx_id); auto res = client.template Call<TransactionCommandAdvancedRpc>(tx_id);

View File

@ -3,7 +3,7 @@
#include <vector> #include <vector>
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp" #include "distributed/pull_produce_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp" #include "distributed/rpc_worker_clients.hpp"
#include "query/frontend/semantic/symbol.hpp" #include "query/frontend/semantic/symbol.hpp"
#include "query/parameters.hpp" #include "query/parameters.hpp"
@ -16,23 +16,24 @@ namespace distributed {
/// and getting the results of that execution. The results are returned in /// and getting the results of that execution. The results are returned in
/// batches and are therefore accompanied with an enum indicator of the state of /// batches and are therefore accompanied with an enum indicator of the state of
/// remote execution. /// remote execution.
class RemotePullRpcClients { class PullRpcClients {
using ClientPool = communication::rpc::ClientPool; using ClientPool = communication::rpc::ClientPool;
public: public:
RemotePullRpcClients(RpcWorkerClients &clients) : clients_(clients) {} PullRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
/// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this /// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this
/// function for the same (tx_id, worker_id, plan_id) before the previous call /// function for the same (tx_id, worker_id, plan_id) before the previous call
/// has ended. /// has ended.
/// ///
/// @todo: it might be cleaner to split RemotePull into {InitRemoteCursor, /// @todo: it might be cleaner to split Pull into {InitRemoteCursor,
/// RemotePull, RemoteAccumulate}, but that's a lot of refactoring and more /// Pull, RemoteAccumulate}, but that's a lot of refactoring and more
/// RPC calls. /// RPC calls.
utils::Future<RemotePullData> RemotePull( utils::Future<PullData> Pull(database::GraphDbAccessor &dba, int worker_id,
database::GraphDbAccessor &dba, int worker_id, int64_t plan_id, int64_t plan_id, const Parameters &params,
const Parameters &params, const std::vector<query::Symbol> &symbols, const std::vector<query::Symbol> &symbols,
bool accumulate, int batch_size = kDefaultBatchSize); bool accumulate,
int batch_size = kDefaultBatchSize);
auto GetWorkerIds() { return clients_.GetWorkerIds(); } auto GetWorkerIds() { return clients_.GetWorkerIds(); }

View File

@ -1,59 +0,0 @@
#include "distributed/remote_data_manager.hpp"
#include "database/storage.hpp"
namespace distributed {
template <typename TRecord>
RemoteCache<TRecord> &RemoteDataManager::GetCache(CacheT<TRecord> &collection,
tx::transaction_id_t tx_id) {
auto access = collection.access();
auto found = access.find(tx_id);
if (found != access.end()) return found->second;
return access
.emplace(
tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(storage_), std::ref(remote_data_clients_)))
.first->second;
}
template <>
RemoteCache<Vertex> &RemoteDataManager::Elements<Vertex>(
tx::transaction_id_t tx_id) {
return GetCache(vertices_caches_, tx_id);
}
template <>
RemoteCache<Edge> &RemoteDataManager::Elements<Edge>(
tx::transaction_id_t tx_id) {
return GetCache(edges_caches_, tx_id);
}
RemoteDataManager::RemoteDataManager(
database::Storage &storage,
distributed::RemoteDataRpcClients &remote_data_clients)
: storage_(storage), remote_data_clients_(remote_data_clients) {}
void RemoteDataManager::ClearCacheForSingleTransaction(
tx::transaction_id_t tx_id) {
Elements<Vertex>(tx_id).ClearCache();
Elements<Edge>(tx_id).ClearCache();
}
void RemoteDataManager::ClearTransactionalCache(
tx::transaction_id_t oldest_active) {
auto vertex_access = vertices_caches_.access();
for (auto &kv : vertex_access) {
if (kv.first < oldest_active) {
vertex_access.remove(kv.first);
}
}
auto edge_access = edges_caches_.access();
for (auto &kv : edge_access) {
if (kv.first < oldest_active) {
edge_access.remove(kv.first);
}
}
}
} // namespace distributed

View File

@ -1,26 +0,0 @@
#include "distributed/remote_data_rpc_clients.hpp"
#include "distributed/remote_data_rpc_messages.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
namespace distributed {
template <>
std::unique_ptr<Edge> RemoteDataRpcClients::RemoteElement(
int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) {
auto response = clients_.GetClientPool(worker_id).Call<RemoteEdgeRpc>(
TxGidPair{tx_id, gid});
CHECK(response) << "RemoteEdgeRpc failed";
return std::move(response->name_output_);
}
template <>
std::unique_ptr<Vertex> RemoteDataRpcClients::RemoteElement(
int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) {
auto response = clients_.GetClientPool(worker_id).Call<RemoteVertexRpc>(
TxGidPair{tx_id, gid});
CHECK(response) << "RemoteVertexRpc failed";
return std::move(response->name_output_);
}
} // namespace distributed

View File

@ -1,28 +0,0 @@
#include <memory>
#include "database/graph_db_accessor.hpp"
#include "distributed/remote_data_rpc_messages.hpp"
#include "remote_data_rpc_server.hpp"
namespace distributed {
RemoteDataRpcServer::RemoteDataRpcServer(database::GraphDb &db,
communication::rpc::Server &server)
: db_(db), rpc_server_(server) {
rpc_server_.Register<RemoteVertexRpc>([this](const RemoteVertexReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id);
auto vertex = dba.FindVertex(req.member.gid, false);
CHECK(vertex.GetOld())
<< "Old record must exist when sending vertex by RPC";
return std::make_unique<RemoteVertexRes>(vertex.GetOld(), db_.WorkerId());
});
rpc_server_.Register<RemoteEdgeRpc>([this](const RemoteEdgeReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id);
auto edge = dba.FindEdge(req.member.gid, false);
CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC";
return std::make_unique<RemoteEdgeRes>(edge.GetOld(), db_.WorkerId());
});
}
} // namespace distributed

View File

@ -1,125 +0,0 @@
#include <unordered_map>
#include <vector>
#include "distributed/remote_updates_rpc_clients.hpp"
#include "query/exceptions.hpp"
namespace distributed {
namespace {
void RaiseIfRemoteError(RemoteUpdateResult result) {
switch (result) {
case RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw query::RemoveAttachedVertexException();
case RemoteUpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError();
case RemoteUpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
"Remote LockTimeoutError during edge creation");
case RemoteUpdateResult::UPDATE_DELETED_ERROR:
throw RecordDeletedError();
case RemoteUpdateResult::DONE:
break;
}
}
}
RemoteUpdateResult RemoteUpdatesRpcClients::RemoteUpdate(
int worker_id, const database::StateDelta &delta) {
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteUpdateRpc>(delta);
CHECK(res) << "RemoteUpdateRpc failed on worker: " << worker_id;
return res->member;
}
gid::Gid RemoteUpdatesRpcClients::RemoteCreateVertex(
int worker_id, tx::transaction_id_t tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteCreateVertexRpc>(
RemoteCreateVertexReqData{tx_id, labels, properties});
CHECK(res) << "RemoteCreateVertexRpc failed on worker: " << worker_id;
CHECK(res->member.result == RemoteUpdateResult::DONE)
<< "Remote Vertex creation result not RemoteUpdateResult::DONE";
return res->member.gid;
}
storage::EdgeAddress RemoteUpdatesRpcClients::RemoteCreateEdge(
tx::transaction_id_t tx_id, VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type) {
CHECK(from.address().is_remote())
<< "In RemoteCreateEdge `from` must be remote";
int from_worker = from.address().worker_id();
auto res = worker_clients_.GetClientPool(from_worker)
.Call<RemoteCreateEdgeRpc>(RemoteCreateEdgeReqData{
from.gid(), to.GlobalAddress(), edge_type, tx_id});
CHECK(res) << "RemoteCreateEdge RPC failed on worker: " << from_worker;
RaiseIfRemoteError(res->member.result);
return {res->member.gid, from_worker};
}
void RemoteUpdatesRpcClients::RemoteAddInEdge(tx::transaction_id_t tx_id,
VertexAccessor &from,
storage::EdgeAddress edge_address,
VertexAccessor &to,
storage::EdgeType edge_type) {
CHECK(to.address().is_remote() && edge_address.is_remote() &&
(from.GlobalAddress().worker_id() != to.address().worker_id()))
<< "RemoteAddInEdge should only be called when `to` is remote and "
"`from` is not on the same worker as `to`.";
auto worker_id = to.GlobalAddress().worker_id();
auto res = worker_clients_.GetClientPool(worker_id).Call<RemoteAddInEdgeRpc>(
RemoteAddInEdgeReqData{from.GlobalAddress(), edge_address, to.gid(),
edge_type, tx_id});
CHECK(res) << "RemoteAddInEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
void RemoteUpdatesRpcClients::RemoteRemoveVertex(int worker_id,
tx::transaction_id_t tx_id,
gid::Gid gid,
bool check_empty) {
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveVertexRpc>(
RemoteRemoveVertexReqData{gid, tx_id, check_empty});
CHECK(res) << "RemoteRemoveVertex RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
void RemoteUpdatesRpcClients::RemoteRemoveEdge(
tx::transaction_id_t tx_id, int worker_id, gid::Gid edge_gid,
gid::Gid vertex_from_id, storage::VertexAddress vertex_to_addr) {
auto res = worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveEdgeRpc>(
RemoteRemoveEdgeData{tx_id, edge_gid, vertex_from_id, vertex_to_addr});
CHECK(res) << "RemoteRemoveEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
void RemoteUpdatesRpcClients::RemoteRemoveInEdge(
tx::transaction_id_t tx_id, int worker_id, gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote())
<< "RemoteRemoveInEdge edge_address is local.";
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveInEdgeRpc>(
RemoteRemoveInEdgeData{tx_id, vertex_id, edge_address});
CHECK(res) << "RemoteRemoveInEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
std::vector<utils::Future<RemoteUpdateResult>>
RemoteUpdatesRpcClients::RemoteUpdateApplyAll(int skip_worker_id,
tx::transaction_id_t tx_id) {
return worker_clients_.ExecuteOnWorkers<RemoteUpdateResult>(
skip_worker_id, [tx_id](auto &client) {
auto res = client.template Call<RemoteUpdateApplyRpc>(tx_id);
CHECK(res) << "RemoteUpdateApplyRpc failed";
return res->member;
});
}
} // namespace distributed

View File

@ -0,0 +1,116 @@
#include <unordered_map>
#include <vector>
#include "distributed/updates_rpc_clients.hpp"
#include "query/exceptions.hpp"
namespace distributed {
namespace {
void RaiseIfRemoteError(UpdateResult result) {
switch (result) {
case UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw query::RemoveAttachedVertexException();
case UpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError();
case UpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
"Remote LockTimeoutError during edge creation");
case UpdateResult::UPDATE_DELETED_ERROR:
throw RecordDeletedError();
case UpdateResult::DONE:
break;
}
}
}
UpdateResult UpdatesRpcClients::Update(int worker_id,
const database::StateDelta &delta) {
auto res = worker_clients_.GetClientPool(worker_id).Call<UpdateRpc>(delta);
CHECK(res) << "UpdateRpc failed on worker: " << worker_id;
return res->member;
}
gid::Gid UpdatesRpcClients::CreateVertex(
int worker_id, tx::transaction_id_t tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
auto res = worker_clients_.GetClientPool(worker_id).Call<CreateVertexRpc>(
CreateVertexReqData{tx_id, labels, properties});
CHECK(res) << "CreateVertexRpc failed on worker: " << worker_id;
CHECK(res->member.result == UpdateResult::DONE)
<< "Remote Vertex creation result not UpdateResult::DONE";
return res->member.gid;
}
storage::EdgeAddress UpdatesRpcClients::CreateEdge(
tx::transaction_id_t tx_id, VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type) {
CHECK(from.address().is_remote()) << "In CreateEdge `from` must be remote";
int from_worker = from.address().worker_id();
auto res = worker_clients_.GetClientPool(from_worker)
.Call<CreateEdgeRpc>(CreateEdgeReqData{
from.gid(), to.GlobalAddress(), edge_type, tx_id});
CHECK(res) << "CreateEdge RPC failed on worker: " << from_worker;
RaiseIfRemoteError(res->member.result);
return {res->member.gid, from_worker};
}
void UpdatesRpcClients::AddInEdge(tx::transaction_id_t tx_id,
VertexAccessor &from,
storage::EdgeAddress edge_address,
VertexAccessor &to,
storage::EdgeType edge_type) {
CHECK(to.address().is_remote() && edge_address.is_remote() &&
(from.GlobalAddress().worker_id() != to.address().worker_id()))
<< "AddInEdge should only be called when `to` is remote and "
"`from` is not on the same worker as `to`.";
auto worker_id = to.GlobalAddress().worker_id();
auto res = worker_clients_.GetClientPool(worker_id).Call<AddInEdgeRpc>(
AddInEdgeReqData{from.GlobalAddress(), edge_address, to.gid(), edge_type,
tx_id});
CHECK(res) << "AddInEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
void UpdatesRpcClients::RemoveVertex(int worker_id, tx::transaction_id_t tx_id,
gid::Gid gid, bool check_empty) {
auto res = worker_clients_.GetClientPool(worker_id).Call<RemoveVertexRpc>(
RemoveVertexReqData{gid, tx_id, check_empty});
CHECK(res) << "RemoveVertex RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
void UpdatesRpcClients::RemoveEdge(tx::transaction_id_t tx_id, int worker_id,
gid::Gid edge_gid, gid::Gid vertex_from_id,
storage::VertexAddress vertex_to_addr) {
auto res = worker_clients_.GetClientPool(worker_id).Call<RemoveEdgeRpc>(
RemoveEdgeData{tx_id, edge_gid, vertex_from_id, vertex_to_addr});
CHECK(res) << "RemoveEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
void UpdatesRpcClients::RemoveInEdge(tx::transaction_id_t tx_id, int worker_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote()) << "RemoveInEdge edge_address is local.";
auto res = worker_clients_.GetClientPool(worker_id).Call<RemoveInEdgeRpc>(
RemoveInEdgeData{tx_id, vertex_id, edge_address});
CHECK(res) << "RemoveInEdge RPC failed on worker: " << worker_id;
RaiseIfRemoteError(res->member);
}
std::vector<utils::Future<UpdateResult>> UpdatesRpcClients::UpdateApplyAll(
int skip_worker_id, tx::transaction_id_t tx_id) {
return worker_clients_.ExecuteOnWorkers<UpdateResult>(
skip_worker_id, [tx_id](auto &client) {
auto res = client.template Call<UpdateApplyRpc>(tx_id);
CHECK(res) << "UpdateApplyRpc failed";
return res->member;
});
}
} // namespace distributed

View File

@ -4,8 +4,8 @@
#include <vector> #include <vector>
#include "database/state_delta.hpp" #include "database/state_delta.hpp"
#include "distributed/remote_updates_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp" #include "distributed/rpc_worker_clients.hpp"
#include "distributed/updates_rpc_messages.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "storage/address_types.hpp" #include "storage/address_types.hpp"
#include "storage/gid.hpp" #include "storage/gid.hpp"
@ -18,17 +18,16 @@ namespace distributed {
/// Exposes the functionality to send updates to other workers (that own the /// Exposes the functionality to send updates to other workers (that own the
/// graph element we are updating). Also enables us to call for a worker to /// graph element we are updating). Also enables us to call for a worker to
/// apply the accumulated deferred updates, or discard them. /// apply the accumulated deferred updates, or discard them.
class RemoteUpdatesRpcClients { class UpdatesRpcClients {
public: public:
explicit RemoteUpdatesRpcClients(RpcWorkerClients &clients) explicit UpdatesRpcClients(RpcWorkerClients &clients)
: worker_clients_(clients) {} : worker_clients_(clients) {}
/// Sends an update delta to the given worker. /// Sends an update delta to the given worker.
RemoteUpdateResult RemoteUpdate(int worker_id, UpdateResult Update(int worker_id, const database::StateDelta &delta);
const database::StateDelta &delta);
/// Creates a vertex on the given worker and returns it's id. /// Creates a vertex on the given worker and returns it's id.
gid::Gid RemoteCreateVertex( gid::Gid CreateVertex(
int worker_id, tx::transaction_id_t tx_id, int worker_id, tx::transaction_id_t tx_id,
const std::vector<storage::Label> &labels, const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue> const std::unordered_map<storage::Property, query::TypedValue>
@ -37,39 +36,37 @@ class RemoteUpdatesRpcClients {
/// Creates an edge on the given worker and returns it's address. If the `to` /// Creates an edge on the given worker and returns it's address. If the `to`
/// vertex is on the same worker as `from`, then all remote CRUD will be /// vertex is on the same worker as `from`, then all remote CRUD will be
/// handled by a call to this function. Otherwise a separate call to /// handled by a call to this function. Otherwise a separate call to
/// `RemoteAddInEdge` might be necessary. Throws all the exceptions that can /// `AddInEdge` might be necessary. Throws all the exceptions that can
/// occur remotely as a result of updating a vertex. /// occur remotely as a result of updating a vertex.
storage::EdgeAddress RemoteCreateEdge(tx::transaction_id_t tx_id, storage::EdgeAddress CreateEdge(tx::transaction_id_t tx_id,
VertexAccessor &from, VertexAccessor &from, VertexAccessor &to,
VertexAccessor &to, storage::EdgeType edge_type);
storage::EdgeType edge_type);
/// Adds the edge with the given address to the `to` vertex as an incoming /// Adds the edge with the given address to the `to` vertex as an incoming
/// edge. Only used when `to` is remote and not on the same worker as `from`. /// edge. Only used when `to` is remote and not on the same worker as `from`.
void RemoteAddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from, void AddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from,
storage::EdgeAddress edge_address, VertexAccessor &to, storage::EdgeAddress edge_address, VertexAccessor &to,
storage::EdgeType edge_type); storage::EdgeType edge_type);
/// Removes a vertex from the other worker. /// Removes a vertex from the other worker.
void RemoteRemoveVertex(int worker_id, tx::transaction_id_t tx_id, void RemoveVertex(int worker_id, tx::transaction_id_t tx_id, gid::Gid gid,
gid::Gid gid, bool check_empty); bool check_empty);
/// Removes an edge on another worker. This also handles the `from` vertex /// Removes an edge on another worker. This also handles the `from` vertex
/// outgoing edge, as that vertex is on the same worker as the edge. If the /// outgoing edge, as that vertex is on the same worker as the edge. If the
/// `to` vertex is on the same worker, then that side is handled too by the /// `to` vertex is on the same worker, then that side is handled too by the
/// single RPC call, otherwise a separate call has to be made to /// single RPC call, otherwise a separate call has to be made to
/// RemoteRemoveInEdge. /// RemoveInEdge.
void RemoteRemoveEdge(tx::transaction_id_t tx_id, int worker_id, void RemoveEdge(tx::transaction_id_t tx_id, int worker_id, gid::Gid edge_gid,
gid::Gid edge_gid, gid::Gid vertex_from_id, gid::Gid vertex_from_id,
storage::VertexAddress vertex_to_addr); storage::VertexAddress vertex_to_addr);
void RemoteRemoveInEdge(tx::transaction_id_t tx_id, int worker_id, void RemoveInEdge(tx::transaction_id_t tx_id, int worker_id,
gid::Gid vertex_id, gid::Gid vertex_id, storage::EdgeAddress edge_address);
storage::EdgeAddress edge_address);
/// Calls for all the workers (except the given one) to apply their updates /// Calls for all the workers (except the given one) to apply their updates
/// and returns the future results. /// and returns the future results.
std::vector<utils::Future<RemoteUpdateResult>> RemoteUpdateApplyAll( std::vector<utils::Future<UpdateResult>> UpdateApplyAll(
int skip_worker_id, tx::transaction_id_t tx_id); int skip_worker_id, tx::transaction_id_t tx_id);
private: private:

View File

@ -14,7 +14,7 @@
namespace distributed { namespace distributed {
/// The result of sending or applying a deferred update to a worker. /// The result of sending or applying a deferred update to a worker.
enum class RemoteUpdateResult { enum class UpdateResult {
DONE, DONE,
SERIALIZATION_ERROR, SERIALIZATION_ERROR,
LOCK_TIMEOUT_ERROR, LOCK_TIMEOUT_ERROR,
@ -22,19 +22,17 @@ enum class RemoteUpdateResult {
UNABLE_TO_DELETE_VERTEX_ERROR UNABLE_TO_DELETE_VERTEX_ERROR
}; };
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateReq, database::StateDelta); RPC_SINGLE_MEMBER_MESSAGE(UpdateReq, database::StateDelta);
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateRes, RemoteUpdateResult); RPC_SINGLE_MEMBER_MESSAGE(UpdateRes, UpdateResult);
using RemoteUpdateRpc = using UpdateRpc = communication::rpc::RequestResponse<UpdateReq, UpdateRes>;
communication::rpc::RequestResponse<RemoteUpdateReq, RemoteUpdateRes>;
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyReq, tx::transaction_id_t); RPC_SINGLE_MEMBER_MESSAGE(UpdateApplyReq, tx::transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyRes, RemoteUpdateResult); RPC_SINGLE_MEMBER_MESSAGE(UpdateApplyRes, UpdateResult);
using RemoteUpdateApplyRpc = using UpdateApplyRpc =
communication::rpc::RequestResponse<RemoteUpdateApplyReq, communication::rpc::RequestResponse<UpdateApplyReq, UpdateApplyRes>;
RemoteUpdateApplyRes>;
struct RemoteCreateResult { struct CreateResult {
RemoteUpdateResult result; UpdateResult result;
// Only valid if creation was successful. // Only valid if creation was successful.
gid::Gid gid; gid::Gid gid;
@ -48,7 +46,7 @@ struct RemoteCreateResult {
} }
}; };
struct RemoteCreateVertexReqData { struct CreateVertexReqData {
tx::transaction_id_t tx_id; tx::transaction_id_t tx_id;
std::vector<storage::Label> labels; std::vector<storage::Label> labels;
std::unordered_map<storage::Property, query::TypedValue> properties; std::unordered_map<storage::Property, query::TypedValue> properties;
@ -84,13 +82,12 @@ struct RemoteCreateVertexReqData {
BOOST_SERIALIZATION_SPLIT_MEMBER() BOOST_SERIALIZATION_SPLIT_MEMBER()
}; };
RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateVertexReq, RemoteCreateVertexReqData); RPC_SINGLE_MEMBER_MESSAGE(CreateVertexReq, CreateVertexReqData);
RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateVertexRes, RemoteCreateResult); RPC_SINGLE_MEMBER_MESSAGE(CreateVertexRes, CreateResult);
using RemoteCreateVertexRpc = using CreateVertexRpc =
communication::rpc::RequestResponse<RemoteCreateVertexReq, communication::rpc::RequestResponse<CreateVertexReq, CreateVertexRes>;
RemoteCreateVertexRes>;
struct RemoteCreateEdgeReqData { struct CreateEdgeReqData {
gid::Gid from; gid::Gid from;
storage::VertexAddress to; storage::VertexAddress to;
storage::EdgeType edge_type; storage::EdgeType edge_type;
@ -108,13 +105,12 @@ struct RemoteCreateEdgeReqData {
} }
}; };
RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateEdgeReq, RemoteCreateEdgeReqData); RPC_SINGLE_MEMBER_MESSAGE(CreateEdgeReq, CreateEdgeReqData);
RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateEdgeRes, RemoteCreateResult); RPC_SINGLE_MEMBER_MESSAGE(CreateEdgeRes, CreateResult);
using RemoteCreateEdgeRpc = using CreateEdgeRpc =
communication::rpc::RequestResponse<RemoteCreateEdgeReq, communication::rpc::RequestResponse<CreateEdgeReq, CreateEdgeRes>;
RemoteCreateEdgeRes>;
struct RemoteAddInEdgeReqData { struct AddInEdgeReqData {
storage::VertexAddress from; storage::VertexAddress from;
storage::EdgeAddress edge_address; storage::EdgeAddress edge_address;
gid::Gid to; gid::Gid to;
@ -134,12 +130,12 @@ struct RemoteAddInEdgeReqData {
} }
}; };
RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeReq, RemoteAddInEdgeReqData); RPC_SINGLE_MEMBER_MESSAGE(AddInEdgeReq, AddInEdgeReqData);
RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeRes, RemoteUpdateResult); RPC_SINGLE_MEMBER_MESSAGE(AddInEdgeRes, UpdateResult);
using RemoteAddInEdgeRpc = using AddInEdgeRpc =
communication::rpc::RequestResponse<RemoteAddInEdgeReq, RemoteAddInEdgeRes>; communication::rpc::RequestResponse<AddInEdgeReq, AddInEdgeRes>;
struct RemoteRemoveVertexReqData { struct RemoveVertexReqData {
gid::Gid gid; gid::Gid gid;
tx::transaction_id_t tx_id; tx::transaction_id_t tx_id;
bool check_empty; bool check_empty;
@ -155,13 +151,12 @@ struct RemoteRemoveVertexReqData {
} }
}; };
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveVertexReq, RemoteRemoveVertexReqData); RPC_SINGLE_MEMBER_MESSAGE(RemoveVertexReq, RemoveVertexReqData);
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveVertexRes, RemoteUpdateResult); RPC_SINGLE_MEMBER_MESSAGE(RemoveVertexRes, UpdateResult);
using RemoteRemoveVertexRpc = using RemoveVertexRpc =
communication::rpc::RequestResponse<RemoteRemoveVertexReq, communication::rpc::RequestResponse<RemoveVertexReq, RemoveVertexRes>;
RemoteRemoveVertexRes>;
struct RemoteRemoveEdgeData { struct RemoveEdgeData {
tx::transaction_id_t tx_id; tx::transaction_id_t tx_id;
gid::Gid edge_id; gid::Gid edge_id;
gid::Gid vertex_from_id; gid::Gid vertex_from_id;
@ -179,13 +174,12 @@ struct RemoteRemoveEdgeData {
} }
}; };
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveEdgeReq, RemoteRemoveEdgeData); RPC_SINGLE_MEMBER_MESSAGE(RemoveEdgeReq, RemoveEdgeData);
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveEdgeRes, RemoteUpdateResult); RPC_SINGLE_MEMBER_MESSAGE(RemoveEdgeRes, UpdateResult);
using RemoteRemoveEdgeRpc = using RemoveEdgeRpc =
communication::rpc::RequestResponse<RemoteRemoveEdgeReq, communication::rpc::RequestResponse<RemoveEdgeReq, RemoveEdgeRes>;
RemoteRemoveEdgeRes>;
struct RemoteRemoveInEdgeData { struct RemoveInEdgeData {
tx::transaction_id_t tx_id; tx::transaction_id_t tx_id;
gid::Gid vertex; gid::Gid vertex;
storage::EdgeAddress edge_address; storage::EdgeAddress edge_address;
@ -201,10 +195,9 @@ struct RemoteRemoveInEdgeData {
} }
}; };
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveInEdgeReq, RemoteRemoveInEdgeData); RPC_SINGLE_MEMBER_MESSAGE(RemoveInEdgeReq, RemoveInEdgeData);
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveInEdgeRes, RemoteUpdateResult); RPC_SINGLE_MEMBER_MESSAGE(RemoveInEdgeRes, UpdateResult);
using RemoteRemoveInEdgeRpc = using RemoveInEdgeRpc =
communication::rpc::RequestResponse<RemoteRemoveInEdgeReq, communication::rpc::RequestResponse<RemoveInEdgeReq, RemoveInEdgeRes>;
RemoteRemoveInEdgeRes>;
} // namespace distributed } // namespace distributed

View File

@ -2,14 +2,13 @@
#include "glog/logging.h" #include "glog/logging.h"
#include "distributed/remote_updates_rpc_server.hpp" #include "distributed/updates_rpc_server.hpp"
#include "threading/sync/lock_timeout_exception.hpp" #include "threading/sync/lock_timeout_exception.hpp"
namespace distributed { namespace distributed {
template <typename TRecordAccessor> template <typename TRecordAccessor>
RemoteUpdateResult UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace(
RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace(
const database::StateDelta &delta) { const database::StateDelta &delta) {
auto gid = std::is_same<TRecordAccessor, VertexAccessor>::value auto gid = std::is_same<TRecordAccessor, VertexAccessor>::value
? delta.vertex_id ? delta.vertex_id
@ -50,18 +49,17 @@ RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace(
// try { // try {
// found->second.first.update(); // found->second.first.update();
// } catch (const mvcc::SerializationError &) { // } catch (const mvcc::SerializationError &) {
// return RemoteUpdateResult::SERIALIZATION_ERROR; // return UpdateResult::SERIALIZATION_ERROR;
// } catch (const RecordDeletedError &) { // } catch (const RecordDeletedError &) {
// return RemoteUpdateResult::UPDATE_DELETED_ERROR; // return UpdateResult::UPDATE_DELETED_ERROR;
// } catch (const LockTimeoutException &) { // } catch (const LockTimeoutException &) {
// return RemoteUpdateResult::LOCK_TIMEOUT_ERROR; // return UpdateResult::LOCK_TIMEOUT_ERROR;
// } // }
return RemoteUpdateResult::DONE; return UpdateResult::DONE;
} }
template <typename TRecordAccessor> template <typename TRecordAccessor>
gid::Gid gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
const std::vector<storage::Label> &labels, const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue> const std::unordered_map<storage::Property, query::TypedValue>
&properties) { &properties) {
@ -75,8 +73,7 @@ RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
} }
template <typename TRecordAccessor> template <typename TRecordAccessor>
gid::Gid gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateEdge(
RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateEdge(
gid::Gid from, storage::VertexAddress to, storage::EdgeType edge_type) { gid::Gid from, storage::VertexAddress to, storage::EdgeType edge_type) {
auto &db = db_accessor_.db(); auto &db = db_accessor_.db();
auto edge = db_accessor_.InsertOnlyEdge( auto edge = db_accessor_.InsertOnlyEdge(
@ -89,8 +86,7 @@ RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateEdge(
} }
template <typename TRecordAccessor> template <typename TRecordAccessor>
RemoteUpdateResult UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
std::lock_guard<SpinLock> guard{lock_}; std::lock_guard<SpinLock> guard{lock_};
for (auto &kv : deltas_) { for (auto &kv : deltas_) {
auto &record_accessor = kv.second.first; auto &record_accessor = kv.second.first;
@ -113,7 +109,7 @@ RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
if (!db_accessor().RemoveVertex( if (!db_accessor().RemoveVertex(
reinterpret_cast<VertexAccessor &>(record_accessor), reinterpret_cast<VertexAccessor &>(record_accessor),
delta.check_empty)) { delta.check_empty)) {
return RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR; return UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR;
} }
break; break;
case database::StateDelta::Type::SET_PROPERTY_VERTEX: case database::StateDelta::Type::SET_PROPERTY_VERTEX:
@ -164,21 +160,21 @@ RemoteUpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
break; break;
} }
} catch (const mvcc::SerializationError &) { } catch (const mvcc::SerializationError &) {
return RemoteUpdateResult::SERIALIZATION_ERROR; return UpdateResult::SERIALIZATION_ERROR;
} catch (const RecordDeletedError &) { } catch (const RecordDeletedError &) {
return RemoteUpdateResult::UPDATE_DELETED_ERROR; return UpdateResult::UPDATE_DELETED_ERROR;
} catch (const LockTimeoutException &) { } catch (const LockTimeoutException &) {
return RemoteUpdateResult::LOCK_TIMEOUT_ERROR; return UpdateResult::LOCK_TIMEOUT_ERROR;
} }
} }
} }
return RemoteUpdateResult::DONE; return UpdateResult::DONE;
} }
RemoteUpdatesRpcServer::RemoteUpdatesRpcServer( UpdatesRpcServer::UpdatesRpcServer(database::GraphDb &db,
database::GraphDb &db, communication::rpc::Server &server) communication::rpc::Server &server)
: db_(db) { : db_(db) {
server.Register<RemoteUpdateRpc>([this](const RemoteUpdateReq &req) { server.Register<UpdateRpc>([this](const UpdateReq &req) {
using DeltaType = database::StateDelta::Type; using DeltaType = database::StateDelta::Type;
auto &delta = req.member; auto &delta = req.member;
switch (delta.type) { switch (delta.type) {
@ -187,10 +183,10 @@ RemoteUpdatesRpcServer::RemoteUpdatesRpcServer(
case DeltaType::REMOVE_LABEL: case DeltaType::REMOVE_LABEL:
case database::StateDelta::Type::REMOVE_OUT_EDGE: case database::StateDelta::Type::REMOVE_OUT_EDGE:
case database::StateDelta::Type::REMOVE_IN_EDGE: case database::StateDelta::Type::REMOVE_IN_EDGE:
return std::make_unique<RemoteUpdateRes>( return std::make_unique<UpdateRes>(
GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta)); GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta));
case DeltaType::SET_PROPERTY_EDGE: case DeltaType::SET_PROPERTY_EDGE:
return std::make_unique<RemoteUpdateRes>( return std::make_unique<UpdateRes>(
GetUpdates(edge_updates_, delta.transaction_id).Emplace(delta)); GetUpdates(edge_updates_, delta.transaction_id).Emplace(delta));
default: default:
LOG(FATAL) << "Can't perform a remote update with delta type: " LOG(FATAL) << "Can't perform a remote update with delta type: "
@ -198,26 +194,24 @@ RemoteUpdatesRpcServer::RemoteUpdatesRpcServer(
} }
}); });
server.Register<RemoteUpdateApplyRpc>( server.Register<UpdateApplyRpc>([this](const UpdateApplyReq &req) {
[this](const RemoteUpdateApplyReq &req) { return std::make_unique<UpdateApplyRes>(Apply(req.member));
return std::make_unique<RemoteUpdateApplyRes>(Apply(req.member));
});
server.Register<RemoteCreateVertexRpc>([this](
const RemoteCreateVertexReq &req) {
gid::Gid gid = GetUpdates(vertex_updates_, req.member.tx_id)
.CreateVertex(req.member.labels, req.member.properties);
return std::make_unique<RemoteCreateVertexRes>(
RemoteCreateResult{RemoteUpdateResult::DONE, gid});
}); });
server.Register<RemoteCreateEdgeRpc>([this](const RemoteCreateEdgeReq &req) { server.Register<CreateVertexRpc>([this](const CreateVertexReq &req) {
gid::Gid gid = GetUpdates(vertex_updates_, req.member.tx_id)
.CreateVertex(req.member.labels, req.member.properties);
return std::make_unique<CreateVertexRes>(
CreateResult{UpdateResult::DONE, gid});
});
server.Register<CreateEdgeRpc>([this](const CreateEdgeReq &req) {
auto data = req.member; auto data = req.member;
auto creation_result = CreateEdge(data); auto creation_result = CreateEdge(data);
// If `from` and `to` are both on this worker, we handle it in this // If `from` and `to` are both on this worker, we handle it in this
// RPC call. Do it only if CreateEdge succeeded. // RPC call. Do it only if CreateEdge succeeded.
if (creation_result.result == RemoteUpdateResult::DONE && if (creation_result.result == UpdateResult::DONE &&
data.to.worker_id() == db_.WorkerId()) { data.to.worker_id() == db_.WorkerId()) {
auto to_delta = database::StateDelta::AddInEdge( auto to_delta = database::StateDelta::AddInEdge(
data.tx_id, data.to.gid(), {data.from, db_.WorkerId()}, data.tx_id, data.to.gid(), {data.from, db_.WorkerId()},
@ -226,47 +220,45 @@ RemoteUpdatesRpcServer::RemoteUpdatesRpcServer(
GetUpdates(vertex_updates_, data.tx_id).Emplace(to_delta); GetUpdates(vertex_updates_, data.tx_id).Emplace(to_delta);
} }
return std::make_unique<RemoteCreateEdgeRes>(creation_result); return std::make_unique<CreateEdgeRes>(creation_result);
}); });
server.Register<RemoteAddInEdgeRpc>([this](const RemoteAddInEdgeReq &req) { server.Register<AddInEdgeRpc>([this](const AddInEdgeReq &req) {
auto to_delta = database::StateDelta::AddInEdge( auto to_delta = database::StateDelta::AddInEdge(
req.member.tx_id, req.member.to, req.member.from, req.member.tx_id, req.member.to, req.member.from,
req.member.edge_address, req.member.edge_type); req.member.edge_address, req.member.edge_type);
auto result = auto result =
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
return std::make_unique<RemoteAddInEdgeRes>(result); return std::make_unique<AddInEdgeRes>(result);
}); });
server.Register<RemoteRemoveVertexRpc>( server.Register<RemoveVertexRpc>([this](const RemoveVertexReq &req) {
[this](const RemoteRemoveVertexReq &req) { auto to_delta = database::StateDelta::RemoveVertex(
auto to_delta = database::StateDelta::RemoveVertex( req.member.tx_id, req.member.gid, req.member.check_empty);
req.member.tx_id, req.member.gid, req.member.check_empty); auto result =
auto result = GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta); return std::make_unique<RemoveVertexRes>(result);
return std::make_unique<RemoteRemoveVertexRes>(result);
});
server.Register<RemoteRemoveEdgeRpc>([this](const RemoteRemoveEdgeReq &req) {
return std::make_unique<RemoteRemoveEdgeRes>(RemoveEdge(req.member));
}); });
server.Register<RemoteRemoveInEdgeRpc>( server.Register<RemoveEdgeRpc>([this](const RemoveEdgeReq &req) {
[this](const RemoteRemoveInEdgeReq &req) { return std::make_unique<RemoveEdgeRes>(RemoveEdge(req.member));
auto data = req.member; });
return std::make_unique<RemoteRemoveInEdgeRes>(
GetUpdates(vertex_updates_, data.tx_id) server.Register<RemoveInEdgeRpc>([this](const RemoveInEdgeReq &req) {
.Emplace(database::StateDelta::RemoveInEdge( auto data = req.member;
data.tx_id, data.vertex, data.edge_address))); return std::make_unique<RemoveInEdgeRes>(
}); GetUpdates(vertex_updates_, data.tx_id)
.Emplace(database::StateDelta::RemoveInEdge(data.tx_id, data.vertex,
data.edge_address)));
});
} }
RemoteUpdateResult RemoteUpdatesRpcServer::Apply(tx::transaction_id_t tx_id) { UpdateResult UpdatesRpcServer::Apply(tx::transaction_id_t tx_id) {
auto apply = [tx_id](auto &collection) { auto apply = [tx_id](auto &collection) {
auto access = collection.access(); auto access = collection.access();
auto found = access.find(tx_id); auto found = access.find(tx_id);
if (found == access.end()) { if (found == access.end()) {
return RemoteUpdateResult::DONE; return UpdateResult::DONE;
} }
auto result = found->second.Apply(); auto result = found->second.Apply();
access.remove(tx_id); access.remove(tx_id);
@ -275,12 +267,12 @@ RemoteUpdateResult RemoteUpdatesRpcServer::Apply(tx::transaction_id_t tx_id) {
auto vertex_result = apply(vertex_updates_); auto vertex_result = apply(vertex_updates_);
auto edge_result = apply(edge_updates_); auto edge_result = apply(edge_updates_);
if (vertex_result != RemoteUpdateResult::DONE) return vertex_result; if (vertex_result != UpdateResult::DONE) return vertex_result;
if (edge_result != RemoteUpdateResult::DONE) return edge_result; if (edge_result != UpdateResult::DONE) return edge_result;
return RemoteUpdateResult::DONE; return UpdateResult::DONE;
} }
void RemoteUpdatesRpcServer::ClearTransactionalCache( void UpdatesRpcServer::ClearTransactionalCache(
tx::transaction_id_t oldest_active) { tx::transaction_id_t oldest_active) {
auto vertex_access = vertex_updates_.access(); auto vertex_access = vertex_updates_.access();
for (auto &kv : vertex_access) { for (auto &kv : vertex_access) {
@ -298,17 +290,15 @@ void RemoteUpdatesRpcServer::ClearTransactionalCache(
// Gets/creates the TransactionUpdates for the given transaction. // Gets/creates the TransactionUpdates for the given transaction.
template <typename TAccessor> template <typename TAccessor>
RemoteUpdatesRpcServer::TransactionUpdates<TAccessor> UpdatesRpcServer::TransactionUpdates<TAccessor> &UpdatesRpcServer::GetUpdates(
&RemoteUpdatesRpcServer::GetUpdates(MapT<TAccessor> &updates, MapT<TAccessor> &updates, tx::transaction_id_t tx_id) {
tx::transaction_id_t tx_id) {
return updates.access() return updates.access()
.emplace(tx_id, std::make_tuple(tx_id), .emplace(tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(db_), tx_id)) std::make_tuple(std::ref(db_), tx_id))
.first->second; .first->second;
} }
RemoteCreateResult RemoteUpdatesRpcServer::CreateEdge( CreateResult UpdatesRpcServer::CreateEdge(const CreateEdgeReqData &req) {
const RemoteCreateEdgeReqData &req) {
auto gid = GetUpdates(edge_updates_, req.tx_id) auto gid = GetUpdates(edge_updates_, req.tx_id)
.CreateEdge(req.from, req.to, req.edge_type); .CreateEdge(req.from, req.to, req.edge_type);
@ -319,22 +309,21 @@ RemoteCreateResult RemoteUpdatesRpcServer::CreateEdge(
return {result, gid}; return {result, gid};
} }
RemoteUpdateResult RemoteUpdatesRpcServer::RemoveEdge( UpdateResult UpdatesRpcServer::RemoveEdge(const RemoveEdgeData &data) {
const RemoteRemoveEdgeData &data) {
// Edge removal. // Edge removal.
auto deletion_delta = auto deletion_delta =
database::StateDelta::RemoveEdge(data.tx_id, data.edge_id); database::StateDelta::RemoveEdge(data.tx_id, data.edge_id);
auto result = GetUpdates(edge_updates_, data.tx_id).Emplace(deletion_delta); auto result = GetUpdates(edge_updates_, data.tx_id).Emplace(deletion_delta);
// Out-edge removal, for sure is local. // Out-edge removal, for sure is local.
if (result == RemoteUpdateResult::DONE) { if (result == UpdateResult::DONE) {
auto remove_out_delta = database::StateDelta::RemoveOutEdge( auto remove_out_delta = database::StateDelta::RemoveOutEdge(
data.tx_id, data.vertex_from_id, {data.edge_id, db_.WorkerId()}); data.tx_id, data.vertex_from_id, {data.edge_id, db_.WorkerId()});
result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_out_delta); result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_out_delta);
} }
// In-edge removal, might not be local. // In-edge removal, might not be local.
if (result == RemoteUpdateResult::DONE && if (result == UpdateResult::DONE &&
data.vertex_to_address.worker_id() == db_.WorkerId()) { data.vertex_to_address.worker_id() == db_.WorkerId()) {
auto remove_in_delta = database::StateDelta::RemoveInEdge( auto remove_in_delta = database::StateDelta::RemoveInEdge(
data.tx_id, data.vertex_to_address.gid(), data.tx_id, data.vertex_to_address.gid(),
@ -347,14 +336,13 @@ RemoteUpdateResult RemoteUpdatesRpcServer::RemoveEdge(
template <> template <>
VertexAccessor VertexAccessor
RemoteUpdatesRpcServer::TransactionUpdates<VertexAccessor>::FindAccessor( UpdatesRpcServer::TransactionUpdates<VertexAccessor>::FindAccessor(
gid::Gid gid) { gid::Gid gid) {
return db_accessor_.FindVertex(gid, false); return db_accessor_.FindVertex(gid, false);
} }
template <> template <>
EdgeAccessor EdgeAccessor UpdatesRpcServer::TransactionUpdates<EdgeAccessor>::FindAccessor(
RemoteUpdatesRpcServer::TransactionUpdates<EdgeAccessor>::FindAccessor(
gid::Gid gid) { gid::Gid gid) {
return db_accessor_.FindEdge(gid, false); return db_accessor_.FindEdge(gid, false);
} }

View File

@ -10,7 +10,7 @@
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp" #include "database/state_delta.hpp"
#include "distributed/remote_updates_rpc_messages.hpp" #include "distributed/updates_rpc_messages.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "storage/edge_accessor.hpp" #include "storage/edge_accessor.hpp"
#include "storage/gid.hpp" #include "storage/gid.hpp"
@ -27,7 +27,7 @@ namespace distributed {
/// ///
/// Attempts to get serialization and update-after-delete errors to happen as /// Attempts to get serialization and update-after-delete errors to happen as
/// soon as possible during query execution (fail fast). /// soon as possible during query execution (fail fast).
class RemoteUpdatesRpcServer { class UpdatesRpcServer {
// Remote updates for one transaction. // Remote updates for one transaction.
template <typename TRecordAccessor> template <typename TRecordAccessor>
class TransactionUpdates { class TransactionUpdates {
@ -38,7 +38,7 @@ class RemoteUpdatesRpcServer {
/// Adds a delta and returns the result. Does not modify the state (data) of /// Adds a delta and returns the result. Does not modify the state (data) of
/// the graph element the update is for, but calls the `update` method to /// the graph element the update is for, but calls the `update` method to
/// fail-fast on serialization and update-after-delete errors. /// fail-fast on serialization and update-after-delete errors.
RemoteUpdateResult Emplace(const database::StateDelta &delta); UpdateResult Emplace(const database::StateDelta &delta);
/// Creates a new vertex and returns it's gid. /// Creates a new vertex and returns it's gid.
gid::Gid CreateVertex( gid::Gid CreateVertex(
@ -52,7 +52,7 @@ class RemoteUpdatesRpcServer {
storage::EdgeType edge_type); storage::EdgeType edge_type);
/// Applies all the deltas on the record. /// Applies all the deltas on the record.
RemoteUpdateResult Apply(); UpdateResult Apply();
auto &db_accessor() { return db_accessor_; } auto &db_accessor() { return db_accessor_; }
@ -69,13 +69,12 @@ class RemoteUpdatesRpcServer {
}; };
public: public:
RemoteUpdatesRpcServer(database::GraphDb &db, UpdatesRpcServer(database::GraphDb &db, communication::rpc::Server &server);
communication::rpc::Server &server);
/// Applies all existsing updates for the given transaction ID. If there are /// Applies all existsing updates for the given transaction ID. If there are
/// no updates for that transaction, nothing happens. Clears the updates cache /// no updates for that transaction, nothing happens. Clears the updates cache
/// after applying them, regardless of the result. /// after applying them, regardless of the result.
RemoteUpdateResult Apply(tx::transaction_id_t tx_id); UpdateResult Apply(tx::transaction_id_t tx_id);
/// Clears the cache of local transactions that have expired. The signature of /// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::CacheCleaner`. /// this method is dictated by `distributed::CacheCleaner`.
@ -96,10 +95,10 @@ class RemoteUpdatesRpcServer {
tx::transaction_id_t tx_id); tx::transaction_id_t tx_id);
// Performs edge creation for the given request. // Performs edge creation for the given request.
RemoteCreateResult CreateEdge(const RemoteCreateEdgeReqData &req); CreateResult CreateEdge(const CreateEdgeReqData &req);
// Performs edge removal for the given request. // Performs edge removal for the given request.
RemoteUpdateResult RemoveEdge(const RemoteRemoveEdgeData &data); UpdateResult RemoveEdge(const RemoveEdgeData &data);
}; };
} // namespace distributed } // namespace distributed

View File

@ -15,9 +15,9 @@
#include "glog/logging.h" #include "glog/logging.h"
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "distributed/remote_pull_rpc_clients.hpp" #include "distributed/pull_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_clients.hpp" #include "distributed/updates_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_server.hpp" #include "distributed/updates_rpc_server.hpp"
#include "query/context.hpp" #include "query/context.hpp"
#include "query/exceptions.hpp" #include "query/exceptions.hpp"
#include "query/frontend/ast/ast.hpp" #include "query/frontend/ast/ast.hpp"
@ -3014,7 +3014,7 @@ class RemotePuller {
RemotePuller(database::GraphDbAccessor &db, RemotePuller(database::GraphDbAccessor &db,
const std::vector<Symbol> &symbols, int64_t plan_id) const std::vector<Symbol> &symbols, int64_t plan_id)
: db_(db), symbols_(symbols), plan_id_(plan_id) { : db_(db), symbols_(symbols), plan_id_(plan_id) {
worker_ids_ = db_.db().remote_pull_clients().GetWorkerIds(); worker_ids_ = db_.db().pull_clients().GetWorkerIds();
// Remove master from the worker ids list. // Remove master from the worker ids list.
worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0));
} }
@ -3022,7 +3022,7 @@ class RemotePuller {
void Initialize(Context &context) { void Initialize(Context &context) {
if (!remote_pulls_initialized_) { if (!remote_pulls_initialized_) {
for (auto &worker_id : worker_ids_) { for (auto &worker_id : worker_ids_) {
UpdateRemotePullForWorker(worker_id, context); UpdatePullForWorker(worker_id, context);
} }
remote_pulls_initialized_ = true; remote_pulls_initialized_ = true;
} }
@ -3052,30 +3052,30 @@ class RemotePuller {
auto remote_results = remote_pull.get(); auto remote_results = remote_pull.get();
switch (remote_results.pull_state) { switch (remote_results.pull_state) {
case distributed::RemotePullState::CURSOR_EXHAUSTED: case distributed::PullState::CURSOR_EXHAUSTED:
move_frames(worker_id, remote_results); move_frames(worker_id, remote_results);
remote_pulls_.erase(found_it); remote_pulls_.erase(found_it);
break; break;
case distributed::RemotePullState::CURSOR_IN_PROGRESS: case distributed::PullState::CURSOR_IN_PROGRESS:
move_frames(worker_id, remote_results); move_frames(worker_id, remote_results);
UpdateRemotePullForWorker(worker_id, context); UpdatePullForWorker(worker_id, context);
break; break;
case distributed::RemotePullState::SERIALIZATION_ERROR: case distributed::PullState::SERIALIZATION_ERROR:
throw mvcc::SerializationError( throw mvcc::SerializationError(
"Serialization error occured during PullRemote !"); "Serialization error occured during PullRemote !");
case distributed::RemotePullState::LOCK_TIMEOUT_ERROR: case distributed::PullState::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException( throw LockTimeoutException(
"LockTimeout error occured during PullRemote !"); "LockTimeout error occured during PullRemote !");
case distributed::RemotePullState::UPDATE_DELETED_ERROR: case distributed::PullState::UPDATE_DELETED_ERROR:
throw QueryRuntimeException( throw QueryRuntimeException(
"RecordDeleted error ocured during PullRemote !"); "RecordDeleted error ocured during PullRemote !");
case distributed::RemotePullState::RECONSTRUCTION_ERROR: case distributed::PullState::RECONSTRUCTION_ERROR:
throw query::ReconstructionException(); throw query::ReconstructionException();
case distributed::RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR: case distributed::PullState::UNABLE_TO_DELETE_VERTEX_ERROR:
throw RemoveAttachedVertexException(); throw RemoveAttachedVertexException();
case distributed::RemotePullState::HINTED_ABORT_ERROR: case distributed::PullState::HINTED_ABORT_ERROR:
throw HintedAbortError(); throw HintedAbortError();
case distributed::RemotePullState::QUERY_ERROR: case distributed::PullState::QUERY_ERROR:
throw QueryRuntimeException( throw QueryRuntimeException(
"Query runtime error occurred duing PullRemote !"); "Query runtime error occurred duing PullRemote !");
} }
@ -3119,15 +3119,14 @@ class RemotePuller {
database::GraphDbAccessor &db_; database::GraphDbAccessor &db_;
std::vector<Symbol> symbols_; std::vector<Symbol> symbols_;
int64_t plan_id_; int64_t plan_id_;
std::unordered_map<int, utils::Future<distributed::RemotePullData>> std::unordered_map<int, utils::Future<distributed::PullData>> remote_pulls_;
remote_pulls_;
std::unordered_map<int, std::vector<std::vector<query::TypedValue>>> std::unordered_map<int, std::vector<std::vector<query::TypedValue>>>
remote_results_; remote_results_;
std::vector<int> worker_ids_; std::vector<int> worker_ids_;
bool remote_pulls_initialized_ = false; bool remote_pulls_initialized_ = false;
void UpdateRemotePullForWorker(int worker_id, Context &context) { void UpdatePullForWorker(int worker_id, Context &context) {
remote_pulls_[worker_id] = db_.db().remote_pull_clients().RemotePull( remote_pulls_[worker_id] = db_.db().pull_clients().Pull(
db_, worker_id, plan_id_, context.parameters_, symbols_, false); db_, worker_id, plan_id_, context.parameters_, symbols_, false);
} }
}; };
@ -3258,12 +3257,11 @@ class SynchronizeCursor : public Cursor {
auto &db = context.db_accessor_.db(); auto &db = context.db_accessor_.db();
// Tell all workers to accumulate, only if there is a remote pull. // Tell all workers to accumulate, only if there is a remote pull.
std::vector<utils::Future<distributed::RemotePullData>> std::vector<utils::Future<distributed::PullData>> worker_accumulations;
worker_accumulations;
if (pull_remote_cursor_) { if (pull_remote_cursor_) {
for (auto worker_id : db.remote_pull_clients().GetWorkerIds()) { for (auto worker_id : db.pull_clients().GetWorkerIds()) {
if (worker_id == db.WorkerId()) continue; if (worker_id == db.WorkerId()) continue;
worker_accumulations.emplace_back(db.remote_pull_clients().RemotePull( worker_accumulations.emplace_back(db.pull_clients().Pull(
context.db_accessor_, worker_id, self_.pull_remote()->plan_id(), context.db_accessor_, worker_id, self_.pull_remote()->plan_id(),
context.parameters_, self_.pull_remote()->symbols(), true, 0)); context.parameters_, self_.pull_remote()->symbols(), true, 0));
} }
@ -3282,29 +3280,29 @@ class SynchronizeCursor : public Cursor {
// Wait for all workers to finish accumulation (first sync point). // Wait for all workers to finish accumulation (first sync point).
for (auto &accu : worker_accumulations) { for (auto &accu : worker_accumulations) {
switch (accu.get().pull_state) { switch (accu.get().pull_state) {
case distributed::RemotePullState::CURSOR_EXHAUSTED: case distributed::PullState::CURSOR_EXHAUSTED:
continue; continue;
case distributed::RemotePullState::CURSOR_IN_PROGRESS: case distributed::PullState::CURSOR_IN_PROGRESS:
throw QueryRuntimeException( throw QueryRuntimeException(
"Expected exhausted cursor after remote pull accumulate"); "Expected exhausted cursor after remote pull accumulate");
case distributed::RemotePullState::SERIALIZATION_ERROR: case distributed::PullState::SERIALIZATION_ERROR:
throw mvcc::SerializationError( throw mvcc::SerializationError(
"Failed to perform remote accumulate due to SerializationError"); "Failed to perform remote accumulate due to SerializationError");
case distributed::RemotePullState::UPDATE_DELETED_ERROR: case distributed::PullState::UPDATE_DELETED_ERROR:
throw QueryRuntimeException( throw QueryRuntimeException(
"Failed to perform remote accumulate due to RecordDeletedError"); "Failed to perform remote accumulate due to RecordDeletedError");
case distributed::RemotePullState::LOCK_TIMEOUT_ERROR: case distributed::PullState::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException( throw LockTimeoutException(
"Failed to perform remote accumulate due to " "Failed to perform remote accumulate due to "
"LockTimeoutException"); "LockTimeoutException");
case distributed::RemotePullState::RECONSTRUCTION_ERROR: case distributed::PullState::RECONSTRUCTION_ERROR:
throw QueryRuntimeException( throw QueryRuntimeException(
"Failed to perform remote accumulate due to ReconstructionError"); "Failed to perform remote accumulate due to ReconstructionError");
case distributed::RemotePullState::UNABLE_TO_DELETE_VERTEX_ERROR: case distributed::PullState::UNABLE_TO_DELETE_VERTEX_ERROR:
throw RemoveAttachedVertexException(); throw RemoveAttachedVertexException();
case distributed::RemotePullState::HINTED_ABORT_ERROR: case distributed::PullState::HINTED_ABORT_ERROR:
throw HintedAbortError(); throw HintedAbortError();
case distributed::RemotePullState::QUERY_ERROR: case distributed::PullState::QUERY_ERROR:
throw QueryRuntimeException( throw QueryRuntimeException(
"Failed to perform remote accumulate due to Query runtime error"); "Failed to perform remote accumulate due to Query runtime error");
} }
@ -3317,22 +3315,22 @@ class SynchronizeCursor : public Cursor {
// Make all the workers apply their deltas. // Make all the workers apply their deltas.
auto tx_id = context.db_accessor_.transaction_id(); auto tx_id = context.db_accessor_.transaction_id();
auto apply_futures = auto apply_futures =
db.remote_updates_clients().RemoteUpdateApplyAll(db.WorkerId(), tx_id); db.updates_clients().UpdateApplyAll(db.WorkerId(), tx_id);
db.remote_updates_server().Apply(tx_id); db.updates_server().Apply(tx_id);
for (auto &future : apply_futures) { for (auto &future : apply_futures) {
switch (future.get()) { switch (future.get()) {
case distributed::RemoteUpdateResult::SERIALIZATION_ERROR: case distributed::UpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError( throw mvcc::SerializationError(
"Failed to apply deferred updates due to SerializationError"); "Failed to apply deferred updates due to SerializationError");
case distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw RemoveAttachedVertexException(); throw RemoveAttachedVertexException();
case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR: case distributed::UpdateResult::UPDATE_DELETED_ERROR:
throw QueryRuntimeException( throw QueryRuntimeException(
"Failed to apply deferred updates due to RecordDeletedError"); "Failed to apply deferred updates due to RecordDeletedError");
case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR: case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException( throw LockTimeoutException(
"Failed to apply deferred update due to LockTimeoutException"); "Failed to apply deferred update due to LockTimeoutException");
case distributed::RemoteUpdateResult::DONE: case distributed::UpdateResult::DONE:
break; break;
} }
} }
@ -3340,7 +3338,7 @@ class SynchronizeCursor : public Cursor {
// If the command advanced, let the workers know. // If the command advanced, let the workers know.
if (self_.advance_command()) { if (self_.advance_command()) {
auto futures = auto futures =
db.remote_pull_clients().NotifyAllTransactionCommandAdvanced(tx_id); db.pull_clients().NotifyAllTransactionCommandAdvanced(tx_id);
for (auto &future : futures) future.wait(); for (auto &future : futures) future.wait();
} }
} }

View File

@ -17,7 +17,7 @@
#include "boost/serialization/shared_ptr.hpp" #include "boost/serialization/shared_ptr.hpp"
#include "boost/serialization/unique_ptr.hpp" #include "boost/serialization/unique_ptr.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp" #include "distributed/pull_produce_rpc_messages.hpp"
#include "query/common.hpp" #include "query/common.hpp"
#include "query/frontend/ast/ast.hpp" #include "query/frontend/ast/ast.hpp"
#include "query/frontend/semantic/symbol.hpp" #include "query/frontend/semantic/symbol.hpp"
@ -2463,7 +2463,7 @@ class Union : public LogicalOperator {
/** /**
* An operator in distributed Memgraph that yields both local and remote (from * An operator in distributed Memgraph that yields both local and remote (from
* other workers) frames. Obtaining remote frames is done through RPC calls to * other workers) frames. Obtaining remote frames is done through RPC calls to
* `distributed::RemoteProduceRpcServer`s running on all the workers. * `distributed::ProduceRpcServer`s running on all the workers.
* *
* This operator aims to yield results as fast as possible and lose minimal * This operator aims to yield results as fast as possible and lose minimal
* time on data transfer. It gives no guarantees on result order. * time on data transfer. It gives no guarantees on result order.
@ -2514,7 +2514,7 @@ class PullRemote : public LogicalOperator {
* *
* Logic of the synchronize operator is: * Logic of the synchronize operator is:
* *
* 1. If there is a RemotePull, tell all the workers to pull on that plan and * 1. If there is a Pull, tell all the workers to pull on that plan and
* accumulate results without sending them to the master. This is async. * accumulate results without sending them to the master. This is async.
* 2. Accumulate local results, in parallel with 1. getting executed on workers. * 2. Accumulate local results, in parallel with 1. getting executed on workers.
* 3. Wait till the master and all the workers are done accumulating. * 3. Wait till the master and all the workers are done accumulating.
@ -2522,7 +2522,7 @@ class PullRemote : public LogicalOperator {
* 5. Tell all the workers to apply their updates. This is async. * 5. Tell all the workers to apply their updates. This is async.
* 6. Apply local updates, in parallel with 5. on the workers. * 6. Apply local updates, in parallel with 5. on the workers.
* 7. Notify workers that the command has advanced, if necessary. * 7. Notify workers that the command has advanced, if necessary.
* 8. Yield all the results, first local, then from RemotePull if available. * 8. Yield all the results, first local, then from Pull if available.
*/ */
class Synchronize : public LogicalOperator { class Synchronize : public LogicalOperator {
public: public:

View File

@ -2,8 +2,8 @@
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp" #include "database/state_delta.hpp"
#include "distributed/remote_data_manager.hpp" #include "distributed/data_manager.hpp"
#include "distributed/remote_updates_rpc_clients.hpp" #include "distributed/updates_rpc_clients.hpp"
#include "query/exceptions.hpp" #include "query/exceptions.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"
#include "storage/record_accessor.hpp" #include "storage/record_accessor.hpp"
@ -148,13 +148,12 @@ bool RecordAccessor<TRecord>::Reconstruct() const {
// It's not possible that we have a global address for a graph element // It's not possible that we have a global address for a graph element
// that's local, because that is resolved in the constructor. // that's local, because that is resolved in the constructor.
// TODO in write queries it's possible the command has been advanced and // TODO in write queries it's possible the command has been advanced and
// we need to invalidate the RemoteCache and really get the latest stuff. // we need to invalidate the Cache and really get the latest stuff.
// But only do that after the command has been advanced. // But only do that after the command has been advanced.
auto &remote_cache = auto &cache = dba.db().data_manager().template Elements<TRecord>(
dba.db().remote_data_manager().template Elements<TRecord>( dba.transaction_id());
dba.transaction_id()); cache.FindSetOldNew(dba.transaction().id_, address_.worker_id(),
remote_cache.FindSetOldNew(dba.transaction().id_, address_.worker_id(), address_.gid(), old_, new_);
address_.gid(), old_, new_);
} }
current_ = old_ ? old_ : new_; current_ = old_ ? old_ : new_;
return old_ != nullptr || new_ != nullptr; return old_ != nullptr || new_ != nullptr;
@ -180,10 +179,9 @@ TRecord &RecordAccessor<TRecord>::update() const {
if (is_local()) { if (is_local()) {
new_ = address_.local()->update(t); new_ = address_.local()->update(t);
} else { } else {
auto &remote_cache = auto &cache = dba.db().data_manager().template Elements<TRecord>(
dba.db().remote_data_manager().template Elements<TRecord>( dba.transaction_id());
dba.transaction_id()); new_ = cache.FindNew(address_.gid());
new_ = remote_cache.FindNew(address_.gid());
} }
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update"; DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
return *new_; return *new_;
@ -204,18 +202,18 @@ void RecordAccessor<TRecord>::SendDelta(
DCHECK(!is_local()) DCHECK(!is_local())
<< "Only a delta created on a remote accessor should be sent"; << "Only a delta created on a remote accessor should be sent";
auto result = db_accessor().db().remote_updates_clients().RemoteUpdate( auto result =
address().worker_id(), delta); db_accessor().db().updates_clients().Update(address().worker_id(), delta);
switch (result) { switch (result) {
case distributed::RemoteUpdateResult::DONE: case distributed::UpdateResult::DONE:
break; break;
case distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR: case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw query::RemoveAttachedVertexException(); throw query::RemoveAttachedVertexException();
case distributed::RemoteUpdateResult::SERIALIZATION_ERROR: case distributed::UpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError(); throw mvcc::SerializationError();
case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR: case distributed::UpdateResult::UPDATE_DELETED_ERROR:
throw RecordDeletedError(); throw RecordDeletedError();
case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR: case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException("Lock timeout on remote worker"); throw LockTimeoutException("Lock timeout on remote worker");
} }
} }

View File

@ -5,7 +5,7 @@
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "distributed/remote_updates_rpc_server.hpp" #include "distributed/updates_rpc_server.hpp"
#include "storage/address_types.hpp" #include "storage/address_types.hpp"
#include "transactions/engine_master.hpp" #include "transactions/engine_master.hpp"
@ -91,9 +91,9 @@ class DistributedGraphDbTest : public ::testing::Test {
VertexAccessor to{to_addr, dba}; VertexAccessor to{to_addr, dba};
auto r_val = auto r_val =
dba.InsertEdge(from, to, dba.EdgeType(edge_type_name)).GlobalAddress(); dba.InsertEdge(from, to, dba.EdgeType(edge_type_name)).GlobalAddress();
master().remote_updates_server().Apply(dba.transaction_id()); master().updates_server().Apply(dba.transaction_id());
worker(1).remote_updates_server().Apply(dba.transaction_id()); worker(1).updates_server().Apply(dba.transaction_id());
worker(2).remote_updates_server().Apply(dba.transaction_id()); worker(2).updates_server().Apply(dba.transaction_id());
dba.Commit(); dba.Commit();
return r_val; return r_val;
} }

View File

@ -8,11 +8,11 @@
#include "distributed/coordination.hpp" #include "distributed/coordination.hpp"
#include "distributed/coordination_master.hpp" #include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp" #include "distributed/coordination_worker.hpp"
#include "distributed/data_rpc_clients.hpp"
#include "distributed/data_rpc_server.hpp"
#include "distributed/plan_consumer.hpp" #include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp" #include "distributed/plan_dispatcher.hpp"
#include "distributed/remote_data_rpc_clients.hpp" #include "distributed/pull_rpc_clients.hpp"
#include "distributed/remote_data_rpc_server.hpp"
#include "distributed/remote_pull_rpc_clients.hpp"
#include "distributed_common.hpp" #include "distributed_common.hpp"
#include "io/network/endpoint.hpp" #include "io/network/endpoint.hpp"
#include "query/frontend/ast/ast.hpp" #include "query/frontend/ast/ast.hpp"

View File

@ -44,7 +44,7 @@ class DistributedInterpretationTest : public DistributedGraphDbTest {
std::experimental::optional<query::Interpreter> interpreter_; std::experimental::optional<query::Interpreter> interpreter_;
}; };
TEST_F(DistributedInterpretationTest, RemotePullTest) { TEST_F(DistributedInterpretationTest, PullTest) {
auto results = Run("OPTIONAL MATCH(n) UNWIND(RANGE(0, 20)) AS X RETURN 1"); auto results = Run("OPTIONAL MATCH(n) UNWIND(RANGE(0, 20)) AS X RETURN 1");
ASSERT_EQ(results.size(), 3 * 21); ASSERT_EQ(results.size(), 3 * 21);
@ -54,7 +54,7 @@ TEST_F(DistributedInterpretationTest, RemotePullTest) {
} }
} }
TEST_F(DistributedInterpretationTest, RemotePullNoResultsTest) { TEST_F(DistributedInterpretationTest, PullNoResultsTest) {
auto results = Run("MATCH (n) RETURN n"); auto results = Run("MATCH (n) RETURN n");
ASSERT_EQ(results.size(), 0U); ASSERT_EQ(results.size(), 0U);
} }

View File

@ -8,11 +8,11 @@
#include "distributed/coordination.hpp" #include "distributed/coordination.hpp"
#include "distributed/coordination_master.hpp" #include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp" #include "distributed/coordination_worker.hpp"
#include "distributed/data_rpc_clients.hpp"
#include "distributed/data_rpc_server.hpp"
#include "distributed/plan_consumer.hpp" #include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp" #include "distributed/plan_dispatcher.hpp"
#include "distributed/remote_data_rpc_clients.hpp" #include "distributed/pull_rpc_clients.hpp"
#include "distributed/remote_data_rpc_server.hpp"
#include "distributed/remote_pull_rpc_clients.hpp"
#include "distributed_common.hpp" #include "distributed_common.hpp"
#include "io/network/endpoint.hpp" #include "io/network/endpoint.hpp"
#include "query/frontend/ast/ast.hpp" #include "query/frontend/ast/ast.hpp"
@ -31,7 +31,7 @@ DECLARE_int32(query_execution_time_sec);
using namespace distributed; using namespace distributed;
using namespace database; using namespace database;
TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { TEST_F(DistributedGraphDbTest, PullProduceRpc) {
GraphDbAccessor dba{master()}; GraphDbAccessor dba{master()};
Context ctx{dba}; Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_}; SymbolGenerator symbol_generator{ctx.symbol_table_};
@ -60,12 +60,11 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]}; std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]};
auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba, auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
int worker_id) { int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id, return master().pull_clients().Pull(dba, worker_id, plan_id, params,
params, symbols, false, 3); symbols, false, 3);
}; };
auto expect_first_batch = [](auto &batch) { auto expect_first_batch = [](auto &batch) {
EXPECT_EQ(batch.pull_state, EXPECT_EQ(batch.pull_state, distributed::PullState::CURSOR_IN_PROGRESS);
distributed::RemotePullState::CURSOR_IN_PROGRESS);
ASSERT_EQ(batch.frames.size(), 3); ASSERT_EQ(batch.frames.size(), 3);
ASSERT_EQ(batch.frames[0].size(), 1); ASSERT_EQ(batch.frames[0].size(), 1);
EXPECT_EQ(batch.frames[0][0].ValueInt(), 42); EXPECT_EQ(batch.frames[0][0].ValueInt(), 42);
@ -73,7 +72,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
EXPECT_EQ(batch.frames[2][0].ValueString(), "bla"); EXPECT_EQ(batch.frames[2][0].ValueString(), "bla");
}; };
auto expect_second_batch = [](auto &batch) { auto expect_second_batch = [](auto &batch) {
EXPECT_EQ(batch.pull_state, distributed::RemotePullState::CURSOR_EXHAUSTED); EXPECT_EQ(batch.pull_state, distributed::PullState::CURSOR_EXHAUSTED);
ASSERT_EQ(batch.frames.size(), 2); ASSERT_EQ(batch.frames.size(), 2);
ASSERT_EQ(batch.frames[0].size(), 1); ASSERT_EQ(batch.frames[0].size(), 1);
EXPECT_EQ(batch.frames[0][0].ValueInt(), 1); EXPECT_EQ(batch.frames[0][0].ValueInt(), 1);
@ -95,7 +94,7 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
} }
} }
TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { TEST_F(DistributedGraphDbTest, PullProduceRpcWithGraphElements) {
// Create some data on the master and both workers. Eeach edge (3 of them) and // Create some data on the master and both workers. Eeach edge (3 of them) and
// vertex (6 of them) will be uniquely identified with their worker id and // vertex (6 of them) will be uniquely identified with their worker id and
// sequence ID, so we can check we retrieved all. // sequence ID, so we can check we retrieved all.
@ -180,8 +179,8 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
ctx.symbol_table_[*return_m], p_sym}; ctx.symbol_table_[*return_m], p_sym};
auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba, auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
int worker_id) { int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id, return master().pull_clients().Pull(dba, worker_id, plan_id, params,
params, symbols, false, 3); symbols, false, 3);
}; };
auto future_w1_results = remote_pull(dba, 1); auto future_w1_results = remote_pull(dba, 1);
auto future_w2_results = remote_pull(dba, 2); auto future_w2_results = remote_pull(dba, 2);
@ -352,13 +351,13 @@ TEST_F(DistributedTransactionTimeout, Timeout) {
std::vector<query::Symbol> symbols{ctx.symbol_table_[*output]}; std::vector<query::Symbol> symbols{ctx.symbol_table_[*output]};
auto remote_pull = [this, &params, &symbols, &dba]() { auto remote_pull = [this, &params, &symbols, &dba]() {
return master() return master()
.remote_pull_clients() .pull_clients()
.RemotePull(dba, 1, plan_id, params, symbols, false, 1) .Pull(dba, 1, plan_id, params, symbols, false, 1)
.get() .get()
.pull_state; .pull_state;
}; };
ASSERT_EQ(remote_pull(), distributed::RemotePullState::CURSOR_IN_PROGRESS); ASSERT_EQ(remote_pull(), distributed::PullState::CURSOR_IN_PROGRESS);
// Sleep here so the remote gets a hinted error. // Sleep here so the remote gets a hinted error.
std::this_thread::sleep_for(2s); std::this_thread::sleep_for(2s);
EXPECT_EQ(remote_pull(), distributed::RemotePullState::HINTED_ABORT_ERROR); EXPECT_EQ(remote_pull(), distributed::PullState::HINTED_ABORT_ERROR);
} }

View File

@ -4,8 +4,8 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "distributed/remote_updates_rpc_clients.hpp" #include "distributed/updates_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_server.hpp" #include "distributed/updates_rpc_server.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "storage/property_value.hpp" #include "storage/property_value.hpp"
@ -53,14 +53,14 @@ class DistributedUpdateTest : public DistributedGraphDbTest {
EXPECT_EQ(var->has_label(label), new_result); \ EXPECT_EQ(var->has_label(label), new_result); \
} }
TEST_F(DistributedUpdateTest, RemoteUpdateLocalOnly) { TEST_F(DistributedUpdateTest, UpdateLocalOnly) {
EXPECT_LABEL(v1_dba2, false, true); EXPECT_LABEL(v1_dba2, false, true);
EXPECT_LABEL(v1_dba1, false, false); EXPECT_LABEL(v1_dba1, false, false);
} }
TEST_F(DistributedUpdateTest, RemoteUpdateApply) { TEST_F(DistributedUpdateTest, UpdateApply) {
EXPECT_LABEL(v1_dba1, false, false); EXPECT_LABEL(v1_dba1, false, false);
worker(1).remote_updates_server().Apply(dba1->transaction_id()); worker(1).updates_server().Apply(dba1->transaction_id());
EXPECT_LABEL(v1_dba1, false, true); EXPECT_LABEL(v1_dba1, false, true);
} }
@ -90,7 +90,7 @@ TEST_F(DistributedGraphDbTest, CreateVertexWithUpdate) {
gid = v.gid(); gid = v.gid();
prop = dba.Property("prop"); prop = dba.Property("prop");
v.PropsSet(prop, 42); v.PropsSet(prop, 42);
worker(2).remote_updates_server().Apply(dba.transaction_id()); worker(2).updates_server().Apply(dba.transaction_id());
dba.Commit(); dba.Commit();
} }
{ {
@ -119,7 +119,7 @@ TEST_F(DistributedGraphDbTest, CreateVertexWithData) {
EXPECT_TRUE(v.has_label(l2)); EXPECT_TRUE(v.has_label(l2));
EXPECT_EQ(v.PropsAt(prop).Value<int64_t>(), 42); EXPECT_EQ(v.PropsAt(prop).Value<int64_t>(), 42);
worker(2).remote_updates_server().Apply(dba.transaction_id()); worker(2).updates_server().Apply(dba.transaction_id());
dba.Commit(); dba.Commit();
} }
{ {
@ -156,9 +156,8 @@ TEST_F(DistributedGraphDbTest, UpdateVertexRemoteAndLocal) {
v_remote.add_label(l2); v_remote.add_label(l2);
v_local.add_label(l1); v_local.add_label(l1);
auto result = auto result = worker(1).updates_server().Apply(dba0.transaction_id());
worker(1).remote_updates_server().Apply(dba0.transaction_id()); EXPECT_EQ(result, distributed::UpdateResult::DONE);
EXPECT_EQ(result, distributed::RemoteUpdateResult::DONE);
} }
} }
@ -172,7 +171,7 @@ TEST_F(DistributedGraphDbTest, AddSameLabelRemoteAndLocal) {
auto l1 = dba1.Label("label"); auto l1 = dba1.Label("label");
v_remote.add_label(l1); v_remote.add_label(l1);
v_local.add_label(l1); v_local.add_label(l1);
worker(1).remote_updates_server().Apply(dba0.transaction_id()); worker(1).updates_server().Apply(dba0.transaction_id());
dba0.Commit(); dba0.Commit();
} }
{ {
@ -191,7 +190,7 @@ TEST_F(DistributedGraphDbTest, IndexGetsUpdatedRemotely) {
label = dba0.Label("label"); label = dba0.Label("label");
VertexAccessor va(v_remote, dba0); VertexAccessor va(v_remote, dba0);
va.add_label(label); va.add_label(label);
worker(1).remote_updates_server().Apply(dba0.transaction_id()); worker(1).updates_server().Apply(dba0.transaction_id());
dba0.Commit(); dba0.Commit();
} }
{ {
@ -208,8 +207,8 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteCommit) {
auto v_remote = VertexAccessor(v_address, dba0); auto v_remote = VertexAccessor(v_address, dba0);
dba0.RemoveVertex(v_remote); dba0.RemoveVertex(v_remote);
EXPECT_TRUE(dba1.FindVertexOptional(v_address.gid(), true)); EXPECT_TRUE(dba1.FindVertexOptional(v_address.gid(), true));
EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), EXPECT_EQ(worker(1).updates_server().Apply(dba0.transaction_id()),
distributed::RemoteUpdateResult::DONE); distributed::UpdateResult::DONE);
EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true)); EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true));
} }
@ -222,8 +221,8 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteBothDelete) {
auto v_remote = VertexAccessor(v_address, dba0); auto v_remote = VertexAccessor(v_address, dba0);
EXPECT_TRUE(dba1.RemoveVertex(v_local)); EXPECT_TRUE(dba1.RemoveVertex(v_local));
EXPECT_TRUE(dba0.RemoveVertex(v_remote)); EXPECT_TRUE(dba0.RemoveVertex(v_remote));
EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), EXPECT_EQ(worker(1).updates_server().Apply(dba0.transaction_id()),
distributed::RemoteUpdateResult::DONE); distributed::UpdateResult::DONE);
EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true)); EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true));
} }
} }
@ -237,8 +236,8 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) {
database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()}; database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()};
auto v_remote = VertexAccessor(v_address, dba0); auto v_remote = VertexAccessor(v_address, dba0);
dba0.RemoveVertex(v_remote); dba0.RemoveVertex(v_remote);
EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), EXPECT_EQ(worker(1).updates_server().Apply(dba0.transaction_id()),
distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR); distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR);
EXPECT_TRUE(dba1.FindVertexOptional(v_address.gid(), true)); EXPECT_TRUE(dba1.FindVertexOptional(v_address.gid(), true));
} }
{ {
@ -251,8 +250,8 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) {
dba1.RemoveEdge(e_local); dba1.RemoveEdge(e_local);
dba0.RemoveVertex(v_remote); dba0.RemoveVertex(v_remote);
EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()), EXPECT_EQ(worker(1).updates_server().Apply(dba0.transaction_id()),
distributed::RemoteUpdateResult::DONE); distributed::UpdateResult::DONE);
EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true)); EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true));
} }
} }
@ -287,9 +286,9 @@ class DistributedDetachDeleteTest : public DistributedGraphDbTest {
accessor.DetachRemoveVertex(v_accessor); accessor.DetachRemoveVertex(v_accessor);
for (auto db_accessor : dba) { for (auto db_accessor : dba) {
ASSERT_EQ(db_accessor.get().db().remote_updates_server().Apply( ASSERT_EQ(db_accessor.get().db().updates_server().Apply(
dba[0].get().transaction_id()), dba[0].get().transaction_id()),
distributed::RemoteUpdateResult::DONE); distributed::UpdateResult::DONE);
} }
check_func(dba); check_func(dba);
@ -380,9 +379,9 @@ class DistributedEdgeCreateTest : public DistributedGraphDbTest {
for (auto &kv : props) edge.PropsSet(dba.Property(kv.first), kv.second); for (auto &kv : props) edge.PropsSet(dba.Property(kv.first), kv.second);
master().remote_updates_server().Apply(dba.transaction_id()); master().updates_server().Apply(dba.transaction_id());
worker(1).remote_updates_server().Apply(dba.transaction_id()); worker(1).updates_server().Apply(dba.transaction_id());
worker(2).remote_updates_server().Apply(dba.transaction_id()); worker(2).updates_server().Apply(dba.transaction_id());
dba.Commit(); dba.Commit();
} }
@ -487,9 +486,9 @@ class DistributedEdgeRemoveTest : public DistributedGraphDbTest {
database::GraphDbAccessor dba{db}; database::GraphDbAccessor dba{db};
EdgeAccessor edge{edge_addr, dba}; EdgeAccessor edge{edge_addr, dba};
dba.RemoveEdge(edge); dba.RemoveEdge(edge);
master().remote_updates_server().Apply(dba.transaction_id()); master().updates_server().Apply(dba.transaction_id());
worker(1).remote_updates_server().Apply(dba.transaction_id()); worker(1).updates_server().Apply(dba.transaction_id());
worker(2).remote_updates_server().Apply(dba.transaction_id()); worker(2).updates_server().Apply(dba.transaction_id());
dba.Commit(); dba.Commit();
} }