Implement basic master and worker
Summary: Add FindChannel to System which works over the network Give example of master and worker Add script to start master and worker Rewrite worker and master Reviewers: zuza, buda Reviewed By: zuza Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D662
This commit is contained in:
parent
98a1b652d1
commit
43015f164c
@ -253,7 +253,7 @@ class Master : public Reactor {
|
||||
for (int64_t w_id = 0; w_id < NUM_WORKERS; ++w_id) {
|
||||
if (channels_[w_id] == nullptr) {
|
||||
// TODO: Resolve worker channel using the network service.
|
||||
channels_[w_id] = system_->FindChannel(GetWorkerName(w_id), "main");
|
||||
channels_[w_id] = system_->FindLocalChannel(GetWorkerName(w_id), "main");
|
||||
if (channels_[w_id] != nullptr) ++workers_found;
|
||||
}
|
||||
}
|
||||
@ -345,7 +345,7 @@ class Worker : public Reactor {
|
||||
|
||||
void FindMaster() {
|
||||
// TODO: Replace with network service and channel resolution.
|
||||
while (!(master_channel_ = system_->FindChannel("master", "main")))
|
||||
while (!(master_channel_ = system_->FindLocalChannel("master", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
}
|
||||
|
||||
@ -356,7 +356,7 @@ class Worker : public Reactor {
|
||||
void ClientMain(System *system) {
|
||||
std::shared_ptr<Channel> channel = nullptr;
|
||||
// TODO: Replace this with network channel resolution.
|
||||
while (!(channel = system->FindChannel("master", "main")))
|
||||
while (!(channel = system->FindLocalChannel("master", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
std::cout << "I/O Client Main active" << std::endl;
|
||||
|
||||
|
@ -161,7 +161,7 @@ std::shared_ptr<Channel> SenderMessage::GetChannelToSender(
|
||||
System *system) const {
|
||||
if (address_ == system->network().Address() &&
|
||||
port_ == system->network().Port()) {
|
||||
return system->FindChannel(reactor_, channel_);
|
||||
return system->FindLocalChannel(reactor_, channel_);
|
||||
}
|
||||
return system->network().Resolve(address_, port_, reactor_, channel_);
|
||||
}
|
||||
|
@ -716,6 +716,8 @@ CEREAL_REGISTER_TYPE(SenderMessage);
|
||||
* E.g. holds set of reactors, channels for all reactors.
|
||||
*/
|
||||
class System {
|
||||
using Location = std::pair<std::string, uint16_t>;
|
||||
|
||||
public:
|
||||
friend class Reactor;
|
||||
|
||||
@ -744,7 +746,7 @@ class System {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
const std::shared_ptr<Channel> FindChannel(const std::string &reactor_name,
|
||||
const std::shared_ptr<Channel> FindLocalChannel(const std::string &reactor_name,
|
||||
const std::string &channel_name) {
|
||||
std::unique_lock<std::recursive_mutex> lock(mutex_);
|
||||
auto it_reactor = reactors_.find(reactor_name);
|
||||
@ -752,6 +754,25 @@ class System {
|
||||
return it_reactor->second.first->FindChannel(channel_name);
|
||||
}
|
||||
|
||||
/** Register process id to the given location. */
|
||||
void RegisterProcess(int64_t id, const std::string& address, uint16_t port) {
|
||||
processes_[id] = Location(address, port);
|
||||
}
|
||||
|
||||
/** Finds channel using process's id. */
|
||||
const std::shared_ptr<Channel> FindChannel(int64_t process_id,
|
||||
const std::string &reactor_name,
|
||||
const std::string &channel_name) {
|
||||
const auto& location = processes_.at(process_id);
|
||||
if (network().Address() == location.first &&
|
||||
network().Port() == location.second)
|
||||
return FindLocalChannel(reactor_name, channel_name);
|
||||
return network().Resolve(location.first, location.second, reactor_name,
|
||||
channel_name);
|
||||
}
|
||||
|
||||
const auto& Processes() { return processes_; }
|
||||
|
||||
void AwaitShutdown() {
|
||||
for (auto &key_value : reactors_) {
|
||||
auto &thread = key_value.second.second;
|
||||
@ -776,5 +797,6 @@ class System {
|
||||
std::unordered_map<std::string,
|
||||
std::pair<std::unique_ptr<Reactor>, std::thread>>
|
||||
reactors_;
|
||||
std::unordered_map<int64_t, Location> processes_;
|
||||
Network network_;
|
||||
};
|
||||
|
@ -37,7 +37,7 @@ void Session::Execute() {
|
||||
std::cout << "Reactor: " << reactor_ << "; Channel: " << channel_
|
||||
<< std::endl;
|
||||
|
||||
auto channel = system_->FindChannel(reactor_, channel_);
|
||||
auto channel = system_->FindLocalChannel(reactor_, channel_);
|
||||
SendSuccess(channel != nullptr);
|
||||
|
||||
handshake_done_ = true;
|
||||
@ -58,7 +58,7 @@ void Session::Execute() {
|
||||
iarchive(message);
|
||||
buffer_.Shift(len_data);
|
||||
|
||||
auto channel = system_->FindChannel(reactor_, channel_);
|
||||
auto channel = system_->FindLocalChannel(reactor_, channel_);
|
||||
if (channel == nullptr) {
|
||||
SendSuccess(false);
|
||||
return;
|
||||
|
@ -48,7 +48,7 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) {
|
||||
Master(System *system, std::string name) : Reactor(system, name) {}
|
||||
virtual void Run() {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel("worker", "main")))
|
||||
while (!(channel = system_->FindLocalChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
CloseConnector("main");
|
||||
@ -59,7 +59,7 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) {
|
||||
Worker(System *system, std::string name) : Reactor(system, name) {}
|
||||
virtual void Run() {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel("master", "main")))
|
||||
while (!(channel = system_->FindLocalChannel("master", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
CloseConnector("main");
|
||||
@ -83,7 +83,7 @@ TEST(SimpleSendTest, OneSimpleSend) {
|
||||
Master(System *system, std::string name) : Reactor(system, name) {}
|
||||
virtual void Run() {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel("worker", "main")))
|
||||
while (!(channel = system_->FindLocalChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
channel->Send<MessageInt>(123);
|
||||
CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII.
|
||||
@ -118,7 +118,7 @@ TEST(SimpleSendTest, OneCallback) {
|
||||
Master(System *system, std::string name) : Reactor(system, name) {}
|
||||
virtual void Run() {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel("worker", "main")))
|
||||
while (!(channel = system_->FindLocalChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
channel->Send<MessageInt>(888);
|
||||
CloseConnector("main");
|
||||
@ -154,7 +154,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
|
||||
Master(System *system, std::string name) : Reactor(system, name) {}
|
||||
virtual void Run() {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel("worker", "main")))
|
||||
while (!(channel = system_->FindLocalChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
channel->Send<MessageInt>(101);
|
||||
channel->Send<MessageInt>(102); // should be ignored
|
||||
@ -236,7 +236,7 @@ TEST(MultipleSendTest, UnsubscribeService) {
|
||||
Master(System *system, std::string name) : Reactor(system, name) {}
|
||||
virtual void Run() {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel("worker", "main")))
|
||||
while (!(channel = system_->FindLocalChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
channel->Send<MessageInt>(55);
|
||||
channel->Send<MessageInt>(66);
|
||||
@ -299,7 +299,7 @@ TEST(MultipleSendTest, OnEvent) {
|
||||
Master(System *system, std::string name) : Reactor(system, name) {}
|
||||
virtual void Run() {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel("worker", "main")))
|
||||
while (!(channel = system_->FindLocalChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
|
||||
channel->Send<MessageInt>(101);
|
||||
@ -357,7 +357,7 @@ TEST(MultipleSendTest, Chaining) {
|
||||
Master(System *system, std::string name) : Reactor(system, name) {}
|
||||
virtual void Run() {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel("worker", "main")))
|
||||
while (!(channel = system_->FindLocalChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
channel->Send<MessageInt>(55);
|
||||
channel->Send<MessageInt>(66);
|
||||
@ -408,7 +408,7 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
|
||||
Master(System *system, std::string name) : Reactor(system, name) {}
|
||||
virtual void Run() {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel("worker", "main")))
|
||||
while (!(channel = system_->FindLocalChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
channel->Send<MessageChar>('a');
|
||||
channel->Send<MessageInt>(55);
|
||||
@ -457,7 +457,7 @@ TEST(MultipleSendTest, ProcessManyMessages) {
|
||||
Master(System *system, std::string name) : Reactor(system, name) {}
|
||||
virtual void Run() {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel("worker", "main")))
|
||||
while (!(channel = system_->FindLocalChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
|
||||
|
Loading…
Reference in New Issue
Block a user