From 3bee31d8f3563e4c0384586377bce0d315781f50 Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Mon, 22 Jan 2018 15:24:04 +0100 Subject: [PATCH] Add PlanDispatcher and PlanConsumer stubs Reviewers: teon.banek, florijan Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1126 --- src/CMakeLists.txt | 2 ++ src/database/graph_db.cpp | 4 +++ src/distributed/plan_consumer.cpp | 26 +++++++++++++++ src/distributed/plan_consumer.hpp | 39 +++++++++++++++++++++++ src/distributed/plan_dispatcher.cpp | 16 ++++++++++ src/distributed/plan_dispatcher.hpp | 32 +++++++++++++++++++ src/distributed/plan_rpc_messages.hpp | 46 +++++++++++++++++++++++++++ 7 files changed, 165 insertions(+) create mode 100644 src/distributed/plan_consumer.cpp create mode 100644 src/distributed/plan_consumer.hpp create mode 100644 src/distributed/plan_dispatcher.cpp create mode 100644 src/distributed/plan_dispatcher.hpp create mode 100644 src/distributed/plan_rpc_messages.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3054948f3..8f3aeaa9e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -16,6 +16,8 @@ set(memgraph_src_files database/state_delta.cpp distributed/coordination_master.cpp distributed/coordination_worker.cpp + distributed/plan_consumer.cpp + distributed/plan_dispatcher.cpp durability/paths.cpp durability/recovery.cpp durability/snapshooter.cpp diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 7b2e1a02d..ed3641b60 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -4,6 +4,8 @@ #include "database/graph_db.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" +#include "distributed/plan_consumer.hpp" +#include "distributed/plan_dispatcher.hpp" #include "distributed/remote_data_rpc_clients.hpp" #include "distributed/remote_data_rpc_server.hpp" #include "durability/paths.hpp" @@ -109,6 +111,7 @@ class Master : public PrivateBase { distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; distributed::RemoteDataRpcClients remote_data_clients_{system_, coordination_}; + distributed::PlanDispatcher plan_dispatcher{system_, coordination_}; }; class Worker : public PrivateBase { @@ -134,6 +137,7 @@ class Worker : public PrivateBase { distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; distributed::RemoteDataRpcClients remote_data_clients_{system_, coordination_}; + distributed::PlanConsumer plan_consumer{system_}; }; #undef IMPL_GETTERS diff --git a/src/distributed/plan_consumer.cpp b/src/distributed/plan_consumer.cpp new file mode 100644 index 000000000..13556f8e4 --- /dev/null +++ b/src/distributed/plan_consumer.cpp @@ -0,0 +1,26 @@ +#include "distributed/plan_consumer.hpp" + +namespace distributed { + +PlanConsumer::PlanConsumer(communication::messaging::System &system) + : server_(system, kDistributedPlanServerName) { + // TODO +} + +pair, SymbolTable> +PlanConsumer::PlanForId(int64_t plan_id) { + auto accessor = plan_cache_.access(); + auto found = accessor.find(plan_id); + CHECK(found != accessor.end()) + << "Missing plan and symbol table for plan id!"; + return found->second; +} + +bool PlanConsumer::ConsumePlan(int64_t, + std::shared_ptr, + SymbolTable) { + // TODO + return false; +} + +} // namespace distributed diff --git a/src/distributed/plan_consumer.hpp b/src/distributed/plan_consumer.hpp new file mode 100644 index 000000000..740356308 --- /dev/null +++ b/src/distributed/plan_consumer.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include "data_structures/concurrent/concurrent_map.hpp" +#include "distributed/plan_rpc_messages.hpp" +#include "query/frontend/semantic/symbol_table.hpp" +#include "query/plan/operator.hpp" + +namespace distributed { + +/** Handles plan consumption from master. Creates and holds a local cache of + * plans. Worker side. + */ +class PlanConsumer { + public: + explicit PlanConsumer(communication::messaging::System &system); + + /** + * Return cached plan and symbol table for a given plan id. + */ + std::pair, SymbolTable> + PlanForId(int64_t plan_id); + + private: + /** + * Receives a plan and stores the given parameters in local cache. Returns + * true upon successful execution. + */ + bool ConsumePlan(int64_t plan_id, + std::shared_ptr plan, + SymbolTable symbol_table); + + communication::rpc::Server server_; + mutable ConcurrentMap< + int64_t, + std::pair, SymbolTable>> + plan_cache_; +}; + +} // namespace distributed diff --git a/src/distributed/plan_dispatcher.cpp b/src/distributed/plan_dispatcher.cpp new file mode 100644 index 000000000..fef1677e2 --- /dev/null +++ b/src/distributed/plan_dispatcher.cpp @@ -0,0 +1,16 @@ +#include + +namespace distributed { + +PlanDispatcher::PlanDispatcher(communication::messaging::System &system, + Coordination &coordination) + : clients_(system, coordination, kDistributedPlanServerName) {} + +void PlanDispatcher::DispatchPlan(int64_t, + std::shared_ptr, + SymbolTable &) { + // TODO + // NOTE: skip id 0 from clients_, it's the master id +} + +} // namespace distributed diff --git a/src/distributed/plan_dispatcher.hpp b/src/distributed/plan_dispatcher.hpp new file mode 100644 index 000000000..751360809 --- /dev/null +++ b/src/distributed/plan_dispatcher.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include "communication/rpc/rpc.hpp" +#include "distributed/coordination.hpp" +#include "distributed/plan_rpc_messages.hpp" +#include "distributed/rpc_worker_clients.hpp" +#include "query/frontend/semantic/symbol_table.hpp" +#include "query/plan/operator.hpp" + +namespace distributed { + +/** Handles plan dispatching to all workers. Uses MasterCoordination to + * acomplish that. Master side. + */ +class PlanDispatcher { + public: + explicit PlanDispatcher(communication::messaging::System &system, + Coordination &coordination); + + /** + * Synchronously dispatch a plan to all workers and wait for their + * acknowledgement. + */ + void DispatchPlan(int64_t plan_id, + std::shared_ptr plan, + SymbolTable &symbol_table); + + private: + RpcWorkerClients clients_; +}; + +} // namespace distributed diff --git a/src/distributed/plan_rpc_messages.hpp b/src/distributed/plan_rpc_messages.hpp new file mode 100644 index 000000000..4dac74488 --- /dev/null +++ b/src/distributed/plan_rpc_messages.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "boost/serialization/access.hpp" +#include "boost/serialization/base_object.hpp" + +#include "communication/messaging/local.hpp" +#include "communication/rpc/rpc.hpp" +#include "query/frontend/semantic/symbol_table.hpp" +#include "query/plan/operator.hpp" +#include "utils/rpc_pimp.hpp" + +namespace distributed { + +const std::string kDistributedPlanServerName = "DistributedPlanRpc"; + +using communication::messaging::Message; +using SymbolTable = query::SymbolTable; + +struct DispatchPlanReq : public Message { + DispatchPlanReq() {} + DispatchPlanReq(int64_t plan_id, + std::shared_ptr plan, + SymbolTable symbol_table) + : plan_id(plan_id), plan(plan), symbol_table(symbol_table) {} + int64_t plan_id; + std::shared_ptr plan; + SymbolTable symbol_table; + + private: + friend class boost::serialization::access; + + template + void serialize(TArchive &ar, unsigned int) { + ar &boost::serialization::base_object(*this); + ar &plan_id; + ar &plan; + ar &symbol_table; + } +}; + +RPC_SINGLE_MEMBER_MESSAGE(ConsumePlanRes, bool); + +using DistributePlan = + communication::rpc::RequestResponse; + +} // namespace distributed