Implement distributed plan dispatcher/consumer methods

Summary:
Implementations of `DistributePlanRpc`.
I'll add tests afterwards #promise

Reviewers: teon.banek, florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1131
This commit is contained in:
Matija Santl 2018-01-23 11:00:50 +01:00
parent 142b1f42b1
commit 62323965e3
7 changed files with 78 additions and 40 deletions

View File

@ -3,6 +3,7 @@
#include "boost/serialization/export.hpp" #include "boost/serialization/export.hpp"
#include "distributed/coordination_rpc_messages.hpp" #include "distributed/coordination_rpc_messages.hpp"
#include "distributed/plan_rpc_messages.hpp"
#include "distributed/remote_data_rpc_messages.hpp" #include "distributed/remote_data_rpc_messages.hpp"
#include "storage/concurrent_id_mapper_rpc_messages.hpp" #include "storage/concurrent_id_mapper_rpc_messages.hpp"
#include "transactions/engine_rpc_messages.hpp" #include "transactions/engine_rpc_messages.hpp"
@ -42,3 +43,7 @@ BOOST_CLASS_EXPORT(distributed::RemoteEdgeRes);
BOOST_CLASS_EXPORT(distributed::RemoteVertexReq); BOOST_CLASS_EXPORT(distributed::RemoteVertexReq);
BOOST_CLASS_EXPORT(distributed::RemoteVertexRes); BOOST_CLASS_EXPORT(distributed::RemoteVertexRes);
BOOST_CLASS_EXPORT(distributed::TxGidPair); BOOST_CLASS_EXPORT(distributed::TxGidPair);
// Distributed plan exchange.
BOOST_CLASS_EXPORT(distributed::DispatchPlanReq);
BOOST_CLASS_EXPORT(distributed::ConsumePlanRes);

View File

@ -83,6 +83,12 @@ class SingleNode : public PrivateBase {
distributed::RemoteDataRpcClients &remote_data_clients() override { distributed::RemoteDataRpcClients &remote_data_clients() override {
LOG(FATAL) << "Remote data clients not available in single-node."; LOG(FATAL) << "Remote data clients not available in single-node.";
} }
distributed::PlanDispatcher &plan_dispatcher() override {
LOG(FATAL) << "Plan Dispatcher not available in single-node.";
}
distributed::PlanConsumer &plan_consumer() override {
LOG(FATAL) << "Plan Consumer not available in single-node.";
}
}; };
#define IMPL_DISTRIBUTED_GETTERS \ #define IMPL_DISTRIBUTED_GETTERS \
@ -101,6 +107,12 @@ class Master : public PrivateBase {
} }
IMPL_GETTERS IMPL_GETTERS
IMPL_DISTRIBUTED_GETTERS IMPL_DISTRIBUTED_GETTERS
distributed::PlanDispatcher &plan_dispatcher() override {
return plan_dispatcher_;
}
distributed::PlanConsumer &plan_consumer() override {
LOG(FATAL) << "Plan Consumer not available in single-node.";
}
communication::messaging::System system_{config_.master_endpoint}; communication::messaging::System system_{config_.master_endpoint};
tx::MasterEngine tx_engine_{system_, &wal_}; tx::MasterEngine tx_engine_{system_, &wal_};
@ -111,7 +123,7 @@ class Master : public PrivateBase {
distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; distributed::RemoteDataRpcServer remote_data_server_{*this, system_};
distributed::RemoteDataRpcClients remote_data_clients_{system_, distributed::RemoteDataRpcClients remote_data_clients_{system_,
coordination_}; coordination_};
distributed::PlanDispatcher plan_dispatcher{system_, coordination_}; distributed::PlanDispatcher plan_dispatcher_{system_, coordination_};
}; };
class Worker : public PrivateBase { class Worker : public PrivateBase {
@ -125,6 +137,10 @@ class Worker : public PrivateBase {
} }
IMPL_GETTERS IMPL_GETTERS
IMPL_DISTRIBUTED_GETTERS IMPL_DISTRIBUTED_GETTERS
distributed::PlanConsumer &plan_consumer() override { return plan_consumer_; }
distributed::PlanDispatcher &plan_dispatcher() override {
LOG(FATAL) << "Plan Dispatcher not available in single-node.";
}
communication::messaging::System system_{config_.worker_endpoint}; communication::messaging::System system_{config_.worker_endpoint};
distributed::WorkerCoordination coordination_{system_, distributed::WorkerCoordination coordination_{system_,
@ -137,7 +153,7 @@ class Worker : public PrivateBase {
distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; distributed::RemoteDataRpcServer remote_data_server_{*this, system_};
distributed::RemoteDataRpcClients remote_data_clients_{system_, distributed::RemoteDataRpcClients remote_data_clients_{system_,
coordination_}; coordination_};
distributed::PlanConsumer plan_consumer{system_}; distributed::PlanConsumer plan_consumer_{system_};
}; };
#undef IMPL_GETTERS #undef IMPL_GETTERS
@ -185,6 +201,12 @@ distributed::RemoteDataRpcServer &PublicBase::remote_data_server() {
distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() { distributed::RemoteDataRpcClients &PublicBase::remote_data_clients() {
return impl_->remote_data_clients(); return impl_->remote_data_clients();
} }
distributed::PlanDispatcher &PublicBase::plan_dispatcher() {
return impl_->plan_dispatcher();
}
distributed::PlanConsumer &PublicBase::plan_consumer() {
return impl_->plan_consumer();
}
void PublicBase::MakeSnapshot() { void PublicBase::MakeSnapshot() {
const bool status = durability::MakeSnapshot( const bool status = durability::MakeSnapshot(

View File

@ -14,8 +14,10 @@
#include "utils/scheduler.hpp" #include "utils/scheduler.hpp"
namespace distributed { namespace distributed {
class RemoteDataRpcServer; class RemoteDataRpcServer;
class RemoteDataRpcClients; class RemoteDataRpcClients;
class PlanDispatcher;
class PlanConsumer;
} }
namespace database { namespace database {
@ -66,11 +68,7 @@ struct Config {
*/ */
class GraphDb { class GraphDb {
public: public:
enum class Type { enum class Type { SINGLE_NODE, DISTRIBUTED_MASTER, DISTRIBUTED_WORKER };
SINGLE_NODE,
DISTRIBUTED_MASTER,
DISTRIBUTED_WORKER
};
GraphDb() {} GraphDb() {}
virtual ~GraphDb() {} virtual ~GraphDb() {}
@ -90,6 +88,8 @@ class GraphDb {
// Supported only in distributed master and worker, not in single-node. // Supported only in distributed master and worker, not in single-node.
virtual distributed::RemoteDataRpcServer &remote_data_server() = 0; virtual distributed::RemoteDataRpcServer &remote_data_server() = 0;
virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0; virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0;
virtual distributed::PlanDispatcher &plan_dispatcher() = 0;
virtual distributed::PlanConsumer &plan_consumer() = 0;
GraphDb(const GraphDb &) = delete; GraphDb(const GraphDb &) = delete;
GraphDb(GraphDb &&) = delete; GraphDb(GraphDb &&) = delete;
@ -119,6 +119,8 @@ class PublicBase : public GraphDb {
int WorkerId() const override; int WorkerId() const override;
distributed::RemoteDataRpcServer &remote_data_server() override; distributed::RemoteDataRpcServer &remote_data_server() override;
distributed::RemoteDataRpcClients &remote_data_clients() override; distributed::RemoteDataRpcClients &remote_data_clients() override;
distributed::PlanDispatcher &plan_dispatcher() override;
distributed::PlanConsumer &plan_consumer() override;
protected: protected:
explicit PublicBase(std::unique_ptr<PrivateBase> impl); explicit PublicBase(std::unique_ptr<PrivateBase> impl);

View File

@ -4,10 +4,14 @@ namespace distributed {
PlanConsumer::PlanConsumer(communication::messaging::System &system) PlanConsumer::PlanConsumer(communication::messaging::System &system)
: server_(system, kDistributedPlanServerName) { : server_(system, kDistributedPlanServerName) {
// TODO server_.Register<DistributedPlanRpc>([this](const DispatchPlanReq &req) {
plan_cache_.access().insert(req.plan_id_,
std::make_pair(req.plan_, req.symbol_table_));
return std::make_unique<ConsumePlanRes>(true);
});
} }
pair<std::shared_ptr<query::plan::LogicalOperator>, SymbolTable> std::pair<std::shared_ptr<query::plan::LogicalOperator>, SymbolTable>
PlanConsumer::PlanForId(int64_t plan_id) { PlanConsumer::PlanForId(int64_t plan_id) {
auto accessor = plan_cache_.access(); auto accessor = plan_cache_.access();
auto found = accessor.find(plan_id); auto found = accessor.find(plan_id);
@ -16,11 +20,4 @@ PlanConsumer::PlanForId(int64_t plan_id) {
return found->second; return found->second;
} }
bool PlanConsumer::ConsumePlan(int64_t,
std::shared_ptr<query::plan::LogicalOperator>,
SymbolTable) {
// TODO
return false;
}
} // namespace distributed } // namespace distributed

View File

@ -21,14 +21,6 @@ class PlanConsumer {
PlanForId(int64_t plan_id); PlanForId(int64_t plan_id);
private: private:
/**
* Receives a plan and stores the given parameters in local cache. Returns
* true upon successful execution.
*/
bool ConsumePlan(int64_t plan_id,
std::shared_ptr<query::plan::LogicalOperator> plan,
SymbolTable symbol_table);
communication::rpc::Server server_; communication::rpc::Server server_;
mutable ConcurrentMap< mutable ConcurrentMap<
int64_t, int64_t,

View File

@ -6,11 +6,15 @@ PlanDispatcher::PlanDispatcher(communication::messaging::System &system,
Coordination &coordination) Coordination &coordination)
: clients_(system, coordination, kDistributedPlanServerName) {} : clients_(system, coordination, kDistributedPlanServerName) {}
void PlanDispatcher::DispatchPlan(int64_t, void PlanDispatcher::DispatchPlan(
std::shared_ptr<query::plan::LogicalOperator>, int64_t plan_id, std::shared_ptr<query::plan::LogicalOperator> plan,
SymbolTable &) { SymbolTable &symbol_table) {
// TODO auto futures = clients_.ExecuteOnWorkers<void>(
// NOTE: skip id 0 from clients_, it's the master id 0, [plan_id, &plan, &symbol_table](communication::rpc::Client &client) {
auto result =
client.Call<DistributedPlanRpc>(300ms, plan_id, plan, symbol_table);
CHECK(result) << "Failed to dispatch plan to worker";
});
} }
} // namespace distributed } // namespace distributed

View File

@ -5,6 +5,7 @@
#include "communication/messaging/local.hpp" #include "communication/messaging/local.hpp"
#include "communication/rpc/rpc.hpp" #include "communication/rpc/rpc.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/semantic/symbol_table.hpp" #include "query/frontend/semantic/symbol_table.hpp"
#include "query/plan/operator.hpp" #include "query/plan/operator.hpp"
#include "utils/rpc_pimp.hpp" #include "utils/rpc_pimp.hpp"
@ -15,32 +16,47 @@ const std::string kDistributedPlanServerName = "DistributedPlanRpc";
using communication::messaging::Message; using communication::messaging::Message;
using SymbolTable = query::SymbolTable; using SymbolTable = query::SymbolTable;
using AstTreeStorage = query::AstTreeStorage;
struct DispatchPlanReq : public Message { struct DispatchPlanReq : public Message {
DispatchPlanReq() {} DispatchPlanReq() {}
DispatchPlanReq(int64_t plan_id, DispatchPlanReq(int64_t plan_id,
std::shared_ptr<query::plan::LogicalOperator> plan, std::shared_ptr<query::plan::LogicalOperator> plan,
SymbolTable symbol_table) SymbolTable symbol_table)
: plan_id(plan_id), plan(plan), symbol_table(symbol_table) {}
int64_t plan_id; : plan_id_(plan_id), plan_(plan), symbol_table_(symbol_table) {}
std::shared_ptr<query::plan::LogicalOperator> plan; int64_t plan_id_;
SymbolTable symbol_table; std::shared_ptr<query::plan::LogicalOperator> plan_;
SymbolTable symbol_table_;
AstTreeStorage storage_;
private: private:
friend class boost::serialization::access; friend class boost::serialization::access;
BOOST_SERIALIZATION_SPLIT_MEMBER();
template <class TArchive> template <class TArchive>
void serialize(TArchive &ar, unsigned int) { void save(TArchive &ar, const unsigned int) const {
ar &boost::serialization::base_object<Message>(*this); ar &boost::serialization::base_object<Message>(*this);
ar &plan_id; ar &plan_id_;
ar &plan; ar &plan_;
ar &symbol_table; ar &symbol_table_;
}
template <class TArchive>
void load(TArchive &ar, const unsigned int) {
ar &boost::serialization::base_object<Message>(*this);
ar &plan_id_;
ar &plan_;
ar &symbol_table_;
storage_ = std::move(
ar.template get_helper<AstTreeStorage>(AstTreeStorage::kHelperId));
} }
}; };
RPC_SINGLE_MEMBER_MESSAGE(ConsumePlanRes, bool); RPC_SINGLE_MEMBER_MESSAGE(ConsumePlanRes, bool);
using DistributePlan = using DistributedPlanRpc =
communication::rpc::RequestResponse<DispatchPlanReq, ConsumePlanRes>; communication::rpc::RequestResponse<DispatchPlanReq, ConsumePlanRes>;
} // namespace distributed } // namespace distributed