diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index 6ddd35421..b18ee14d5 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -3,6 +3,7 @@ #include "boost/serialization/export.hpp" #include "distributed/coordination_rpc_messages.hpp" +#include "distributed/plan_rpc_messages.hpp" #include "distributed/remote_data_rpc_messages.hpp" #include "storage/concurrent_id_mapper_rpc_messages.hpp" #include "transactions/engine_rpc_messages.hpp" @@ -42,3 +43,7 @@ BOOST_CLASS_EXPORT(distributed::RemoteEdgeRes); BOOST_CLASS_EXPORT(distributed::RemoteVertexReq); BOOST_CLASS_EXPORT(distributed::RemoteVertexRes); BOOST_CLASS_EXPORT(distributed::TxGidPair); + +// Distributed plan exchange. +BOOST_CLASS_EXPORT(distributed::DispatchPlanReq); +BOOST_CLASS_EXPORT(distributed::ConsumePlanRes); diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index ed3641b60..803bf6ede 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -83,6 +83,12 @@ class SingleNode : public PrivateBase { distributed::RemoteDataRpcClients &remote_data_clients() override { LOG(FATAL) << "Remote data clients not available in single-node."; } + distributed::PlanDispatcher &plan_dispatcher() override { + LOG(FATAL) << "Plan Dispatcher not available in single-node."; + } + distributed::PlanConsumer &plan_consumer() override { + LOG(FATAL) << "Plan Consumer not available in single-node."; + } }; #define IMPL_DISTRIBUTED_GETTERS \ @@ -101,6 +107,12 @@ class Master : public PrivateBase { } IMPL_GETTERS IMPL_DISTRIBUTED_GETTERS + distributed::PlanDispatcher &plan_dispatcher() override { + return plan_dispatcher_; + } + distributed::PlanConsumer &plan_consumer() override { + LOG(FATAL) << "Plan Consumer not available in single-node."; + } communication::messaging::System system_{config_.master_endpoint}; tx::MasterEngine tx_engine_{system_, &wal_}; @@ -111,7 +123,7 @@ class Master : public PrivateBase { distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; distributed::RemoteDataRpcClients remote_data_clients_{system_, coordination_}; - distributed::PlanDispatcher plan_dispatcher{system_, coordination_}; + distributed::PlanDispatcher plan_dispatcher_{system_, coordination_}; }; class Worker : public PrivateBase { @@ -125,6 +137,10 @@ class Worker : public PrivateBase { } IMPL_GETTERS IMPL_DISTRIBUTED_GETTERS + distributed::PlanConsumer &plan_consumer() override { return plan_consumer_; } + distributed::PlanDispatcher &plan_dispatcher() override { + LOG(FATAL) << "Plan Dispatcher not available in single-node."; + } communication::messaging::System system_{config_.worker_endpoint}; distributed::WorkerCoordination coordination_{system_, @@ -137,7 +153,7 @@ class Worker : public PrivateBase { distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; distributed::RemoteDataRpcClients remote_data_clients_{system_, coordination_}; - distributed::PlanConsumer plan_consumer{system_}; + distributed::PlanConsumer plan_consumer_{system_}; }; #undef IMPL_GETTERS @@ -185,6 +201,12 @@ distributed::RemoteDataRpcServer &PublicBase::remote_data_server() { distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() { return impl_->remote_data_clients(); } +distributed::PlanDispatcher &PublicBase::plan_dispatcher() { + return impl_->plan_dispatcher(); +} +distributed::PlanConsumer &PublicBase::plan_consumer() { + return impl_->plan_consumer(); +} void PublicBase::MakeSnapshot() { const bool status = durability::MakeSnapshot( diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index e60edcc3a..a20da5fd8 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -14,8 +14,10 @@ #include "utils/scheduler.hpp" namespace distributed { - class RemoteDataRpcServer; - class RemoteDataRpcClients; +class RemoteDataRpcServer; +class RemoteDataRpcClients; +class PlanDispatcher; +class PlanConsumer; } namespace database { @@ -66,11 +68,7 @@ struct Config { */ class GraphDb { public: - enum class Type { - SINGLE_NODE, - DISTRIBUTED_MASTER, - DISTRIBUTED_WORKER - }; + enum class Type { SINGLE_NODE, DISTRIBUTED_MASTER, DISTRIBUTED_WORKER }; GraphDb() {} virtual ~GraphDb() {} @@ -90,6 +88,8 @@ class GraphDb { // Supported only in distributed master and worker, not in single-node. virtual distributed::RemoteDataRpcServer &remote_data_server() = 0; virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0; + virtual distributed::PlanDispatcher &plan_dispatcher() = 0; + virtual distributed::PlanConsumer &plan_consumer() = 0; GraphDb(const GraphDb &) = delete; GraphDb(GraphDb &&) = delete; @@ -119,6 +119,8 @@ class PublicBase : public GraphDb { int WorkerId() const override; distributed::RemoteDataRpcServer &remote_data_server() override; distributed::RemoteDataRpcClients &remote_data_clients() override; + distributed::PlanDispatcher &plan_dispatcher() override; + distributed::PlanConsumer &plan_consumer() override; protected: explicit PublicBase(std::unique_ptr impl); diff --git a/src/distributed/plan_consumer.cpp b/src/distributed/plan_consumer.cpp index 13556f8e4..bd39d2f12 100644 --- a/src/distributed/plan_consumer.cpp +++ b/src/distributed/plan_consumer.cpp @@ -4,10 +4,14 @@ namespace distributed { PlanConsumer::PlanConsumer(communication::messaging::System &system) : server_(system, kDistributedPlanServerName) { - // TODO + server_.Register([this](const DispatchPlanReq &req) { + plan_cache_.access().insert(req.plan_id_, + std::make_pair(req.plan_, req.symbol_table_)); + return std::make_unique(true); + }); } -pair, SymbolTable> +std::pair, SymbolTable> PlanConsumer::PlanForId(int64_t plan_id) { auto accessor = plan_cache_.access(); auto found = accessor.find(plan_id); @@ -16,11 +20,4 @@ PlanConsumer::PlanForId(int64_t plan_id) { return found->second; } -bool PlanConsumer::ConsumePlan(int64_t, - std::shared_ptr, - SymbolTable) { - // TODO - return false; -} - } // namespace distributed diff --git a/src/distributed/plan_consumer.hpp b/src/distributed/plan_consumer.hpp index 740356308..99aebe2dc 100644 --- a/src/distributed/plan_consumer.hpp +++ b/src/distributed/plan_consumer.hpp @@ -21,14 +21,6 @@ class PlanConsumer { PlanForId(int64_t plan_id); private: - /** - * Receives a plan and stores the given parameters in local cache. Returns - * true upon successful execution. - */ - bool ConsumePlan(int64_t plan_id, - std::shared_ptr plan, - SymbolTable symbol_table); - communication::rpc::Server server_; mutable ConcurrentMap< int64_t, diff --git a/src/distributed/plan_dispatcher.cpp b/src/distributed/plan_dispatcher.cpp index fef1677e2..1a5ce756f 100644 --- a/src/distributed/plan_dispatcher.cpp +++ b/src/distributed/plan_dispatcher.cpp @@ -6,11 +6,15 @@ PlanDispatcher::PlanDispatcher(communication::messaging::System &system, Coordination &coordination) : clients_(system, coordination, kDistributedPlanServerName) {} -void PlanDispatcher::DispatchPlan(int64_t, - std::shared_ptr, - SymbolTable &) { - // TODO - // NOTE: skip id 0 from clients_, it's the master id +void PlanDispatcher::DispatchPlan( + int64_t plan_id, std::shared_ptr plan, + SymbolTable &symbol_table) { + auto futures = clients_.ExecuteOnWorkers( + 0, [plan_id, &plan, &symbol_table](communication::rpc::Client &client) { + auto result = + client.Call(300ms, plan_id, plan, symbol_table); + CHECK(result) << "Failed to dispatch plan to worker"; + }); } } // namespace distributed diff --git a/src/distributed/plan_rpc_messages.hpp b/src/distributed/plan_rpc_messages.hpp index 4dac74488..65a1f59db 100644 --- a/src/distributed/plan_rpc_messages.hpp +++ b/src/distributed/plan_rpc_messages.hpp @@ -5,6 +5,7 @@ #include "communication/messaging/local.hpp" #include "communication/rpc/rpc.hpp" +#include "query/frontend/ast/ast.hpp" #include "query/frontend/semantic/symbol_table.hpp" #include "query/plan/operator.hpp" #include "utils/rpc_pimp.hpp" @@ -15,32 +16,47 @@ const std::string kDistributedPlanServerName = "DistributedPlanRpc"; using communication::messaging::Message; using SymbolTable = query::SymbolTable; +using AstTreeStorage = query::AstTreeStorage; struct DispatchPlanReq : public Message { DispatchPlanReq() {} DispatchPlanReq(int64_t plan_id, std::shared_ptr plan, SymbolTable symbol_table) - : plan_id(plan_id), plan(plan), symbol_table(symbol_table) {} - int64_t plan_id; - std::shared_ptr plan; - SymbolTable symbol_table; + + : plan_id_(plan_id), plan_(plan), symbol_table_(symbol_table) {} + int64_t plan_id_; + std::shared_ptr plan_; + SymbolTable symbol_table_; + AstTreeStorage storage_; private: friend class boost::serialization::access; + BOOST_SERIALIZATION_SPLIT_MEMBER(); + template - void serialize(TArchive &ar, unsigned int) { + void save(TArchive &ar, const unsigned int) const { ar &boost::serialization::base_object(*this); - ar &plan_id; - ar &plan; - ar &symbol_table; + ar &plan_id_; + ar &plan_; + ar &symbol_table_; + } + + template + void load(TArchive &ar, const unsigned int) { + ar &boost::serialization::base_object(*this); + ar &plan_id_; + ar &plan_; + ar &symbol_table_; + storage_ = std::move( + ar.template get_helper(AstTreeStorage::kHelperId)); } }; RPC_SINGLE_MEMBER_MESSAGE(ConsumePlanRes, bool); -using DistributePlan = +using DistributedPlanRpc = communication::rpc::RequestResponse; } // namespace distributed