Add PlanDispatcher and PlanConsumer stubs

Reviewers: teon.banek, florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1126
This commit is contained in:
Matija Santl 2018-01-22 15:24:04 +01:00
parent ca9fac8adc
commit 3bee31d8f3
7 changed files with 165 additions and 0 deletions

View File

@ -16,6 +16,8 @@ set(memgraph_src_files
database/state_delta.cpp database/state_delta.cpp
distributed/coordination_master.cpp distributed/coordination_master.cpp
distributed/coordination_worker.cpp distributed/coordination_worker.cpp
distributed/plan_consumer.cpp
distributed/plan_dispatcher.cpp
durability/paths.cpp durability/paths.cpp
durability/recovery.cpp durability/recovery.cpp
durability/snapshooter.cpp durability/snapshooter.cpp

View File

@ -4,6 +4,8 @@
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
#include "distributed/coordination_master.hpp" #include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp" #include "distributed/coordination_worker.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "distributed/remote_data_rpc_clients.hpp" #include "distributed/remote_data_rpc_clients.hpp"
#include "distributed/remote_data_rpc_server.hpp" #include "distributed/remote_data_rpc_server.hpp"
#include "durability/paths.hpp" #include "durability/paths.hpp"
@ -109,6 +111,7 @@ class Master : public PrivateBase {
distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; distributed::RemoteDataRpcServer remote_data_server_{*this, system_};
distributed::RemoteDataRpcClients remote_data_clients_{system_, distributed::RemoteDataRpcClients remote_data_clients_{system_,
coordination_}; coordination_};
distributed::PlanDispatcher plan_dispatcher{system_, coordination_};
}; };
class Worker : public PrivateBase { class Worker : public PrivateBase {
@ -134,6 +137,7 @@ class Worker : public PrivateBase {
distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; distributed::RemoteDataRpcServer remote_data_server_{*this, system_};
distributed::RemoteDataRpcClients remote_data_clients_{system_, distributed::RemoteDataRpcClients remote_data_clients_{system_,
coordination_}; coordination_};
distributed::PlanConsumer plan_consumer{system_};
}; };
#undef IMPL_GETTERS #undef IMPL_GETTERS

View File

@ -0,0 +1,26 @@
#include "distributed/plan_consumer.hpp"
namespace distributed {
PlanConsumer::PlanConsumer(communication::messaging::System &system)
: server_(system, kDistributedPlanServerName) {
// TODO
}
pair<std::shared_ptr<query::plan::LogicalOperator>, SymbolTable>
PlanConsumer::PlanForId(int64_t plan_id) {
auto accessor = plan_cache_.access();
auto found = accessor.find(plan_id);
CHECK(found != accessor.end())
<< "Missing plan and symbol table for plan id!";
return found->second;
}
bool PlanConsumer::ConsumePlan(int64_t,
std::shared_ptr<query::plan::LogicalOperator>,
SymbolTable) {
// TODO
return false;
}
} // namespace distributed

View File

@ -0,0 +1,39 @@
#pragma once
#include "data_structures/concurrent/concurrent_map.hpp"
#include "distributed/plan_rpc_messages.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/plan/operator.hpp"
namespace distributed {
/** Handles plan consumption from master. Creates and holds a local cache of
* plans. Worker side.
*/
class PlanConsumer {
public:
explicit PlanConsumer(communication::messaging::System &system);
/**
* Return cached plan and symbol table for a given plan id.
*/
std::pair<std::shared_ptr<query::plan::LogicalOperator>, SymbolTable>
PlanForId(int64_t plan_id);
private:
/**
* Receives a plan and stores the given parameters in local cache. Returns
* true upon successful execution.
*/
bool ConsumePlan(int64_t plan_id,
std::shared_ptr<query::plan::LogicalOperator> plan,
SymbolTable symbol_table);
communication::rpc::Server server_;
mutable ConcurrentMap<
int64_t,
std::pair<std::shared_ptr<query::plan::LogicalOperator>, SymbolTable>>
plan_cache_;
};
} // namespace distributed

View File

@ -0,0 +1,16 @@
#include <distributed/plan_dispatcher.hpp>
namespace distributed {
PlanDispatcher::PlanDispatcher(communication::messaging::System &system,
Coordination &coordination)
: clients_(system, coordination, kDistributedPlanServerName) {}
void PlanDispatcher::DispatchPlan(int64_t,
std::shared_ptr<query::plan::LogicalOperator>,
SymbolTable &) {
// TODO
// NOTE: skip id 0 from clients_, it's the master id
}
} // namespace distributed

View File

@ -0,0 +1,32 @@
#pragma once
#include "communication/rpc/rpc.hpp"
#include "distributed/coordination.hpp"
#include "distributed/plan_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/plan/operator.hpp"
namespace distributed {
/** Handles plan dispatching to all workers. Uses MasterCoordination to
* acomplish that. Master side.
*/
class PlanDispatcher {
public:
explicit PlanDispatcher(communication::messaging::System &system,
Coordination &coordination);
/**
* Synchronously dispatch a plan to all workers and wait for their
* acknowledgement.
*/
void DispatchPlan(int64_t plan_id,
std::shared_ptr<query::plan::LogicalOperator> plan,
SymbolTable &symbol_table);
private:
RpcWorkerClients clients_;
};
} // namespace distributed

View File

@ -0,0 +1,46 @@
#pragma once
#include "boost/serialization/access.hpp"
#include "boost/serialization/base_object.hpp"
#include "communication/messaging/local.hpp"
#include "communication/rpc/rpc.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/plan/operator.hpp"
#include "utils/rpc_pimp.hpp"
namespace distributed {
const std::string kDistributedPlanServerName = "DistributedPlanRpc";
using communication::messaging::Message;
using SymbolTable = query::SymbolTable;
struct DispatchPlanReq : public Message {
DispatchPlanReq() {}
DispatchPlanReq(int64_t plan_id,
std::shared_ptr<query::plan::LogicalOperator> plan,
SymbolTable symbol_table)
: plan_id(plan_id), plan(plan), symbol_table(symbol_table) {}
int64_t plan_id;
std::shared_ptr<query::plan::LogicalOperator> plan;
SymbolTable symbol_table;
private:
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &boost::serialization::base_object<Message>(*this);
ar &plan_id;
ar &plan;
ar &symbol_table;
}
};
RPC_SINGLE_MEMBER_MESSAGE(ConsumePlanRes, bool);
using DistributePlan =
communication::rpc::RequestResponse<DispatchPlanReq, ConsumePlanRes>;
} // namespace distributed