Create Distributed class for non-local functionality

Reviewers: zuza, lion, buda, mferencevic

Reviewed By: zuza

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D669
This commit is contained in:
Sasa Stanko 2017-08-17 14:46:34 +02:00
parent 6db9e38e1e
commit 061b8933a7
10 changed files with 309 additions and 206 deletions

View File

@ -253,7 +253,7 @@ class Master : public Reactor {
for (int64_t w_id = 0; w_id < NUM_WORKERS; ++w_id) {
if (channels_[w_id] == nullptr) {
// TODO: Resolve worker channel using the network service.
channels_[w_id] = system_->FindLocalChannel(GetWorkerName(w_id), "main");
channels_[w_id] = system_->FindChannel(GetWorkerName(w_id), "main");
if (channels_[w_id] != nullptr) ++workers_found;
}
}
@ -345,7 +345,7 @@ class Worker : public Reactor {
void FindMaster() {
// TODO: Replace with network service and channel resolution.
while (!(master_channel_ = system_->FindLocalChannel("master", "main")))
while (!(master_channel_ = system_->FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
}
@ -356,7 +356,7 @@ class Worker : public Reactor {
void ClientMain(System *system) {
std::shared_ptr<Channel> channel = nullptr;
// TODO: Replace this with network channel resolution.
while (!(channel = system->FindLocalChannel("master", "main")))
while (!(channel = system->FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "I/O Client Main active" << std::endl;
@ -378,7 +378,6 @@ int main(int argc, char *argv[]) {
//google::InitGoogleLogging(argv[0]);
gflags::ParseCommandLineFlags(&argc, &argv, true);
System system;
system.StartServices();
system.Spawn<Master>("master");
std::thread client(ClientMain, &system);
for (int i = 0; i < NUM_WORKERS; ++i)

View File

@ -10,11 +10,11 @@ void EventStream::Subscription::unsubscribe() const {
thread_local Reactor* current_reactor_ = nullptr;
std::string Connector::LocalChannel::Address() {
return system_->network().Address();
return FLAGS_address;
}
uint16_t Connector::LocalChannel::Port() {
return system_->network().Port();
return FLAGS_port;
}
std::string Connector::LocalChannel::ReactorName() {
@ -38,7 +38,7 @@ std::pair<EventStream*, std::shared_ptr<Channel>> Reactor::Open(const std::strin
+ "already exists");
}
auto it = connectors_.emplace(connector_name,
std::make_shared<Connector>(Connector::Params{system_, name_, connector_name, mutex_, cvar_})).first;
std::make_shared<Connector>(Connector::Params{name_, connector_name, mutex_, cvar_})).first;
it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
}
@ -50,7 +50,7 @@ std::pair<EventStream*, std::shared_ptr<Channel>> Reactor::Open() {
if (connectors_.count(connector_name) == 0) {
// Connector &queue = connectors_[connector_name];
auto it = connectors_.emplace(connector_name,
std::make_shared<Connector>(Connector::Params{system_, name_, connector_name, mutex_, cvar_})).first;
std::make_shared<Connector>(Connector::Params{name_, connector_name, mutex_, cvar_})).first;
it->second->self_ptr_ = it->second;
return make_pair(&it->second->stream_, it->second->LockedOpenChannel());
}
@ -158,10 +158,11 @@ std::string SenderMessage::ReactorName() const { return reactor_; }
std::string SenderMessage::ChannelName() const { return channel_; }
std::shared_ptr<Channel> SenderMessage::GetChannelToSender(
System *system) const {
if (address_ == system->network().Address() &&
port_ == system->network().Port()) {
return system->FindLocalChannel(reactor_, channel_);
System *system, Distributed *distributed) const {
if (address_ == FLAGS_address && port_ == FLAGS_port) {
return system->FindChannel(reactor_, channel_);
}
return system->network().Resolve(address_, port_, reactor_, channel_);
if (distributed)
return distributed->network().Resolve(address_, port_, reactor_, channel_);
assert(false);
}

View File

@ -205,7 +205,6 @@ class EventStream {
typedef std::function<void(const Message&, const Subscription&)> Callback;
private:
virtual void OnEventHelper(std::type_index tidx, Callback callback) = 0;
};
@ -227,8 +226,7 @@ class Connector {
friend class EventStream::Subscription;
Connector(Params params)
: system_(params.system),
connector_name_(params.connector_name),
: connector_name_(params.connector_name),
reactor_name_(params.reactor_name),
mutex_(params.mutex),
cvar_(params.cvar),
@ -247,12 +245,11 @@ class Connector {
friend class Connector;
LocalChannel(std::shared_ptr<std::mutex> mutex, std::string reactor_name,
std::string connector_name, std::weak_ptr<Connector> queue, System *system)
std::string connector_name, std::weak_ptr<Connector> queue)
: mutex_(mutex),
reactor_name_(reactor_name),
connector_name_(connector_name),
weak_queue_(queue),
system_(system) {}
weak_queue_(queue) {}
virtual void Send(std::unique_ptr<Message> m) {
std::shared_ptr<Connector> queue_ = weak_queue_.lock(); // Atomic, per the standard.
@ -276,7 +273,6 @@ class Connector {
std::string reactor_name_;
std::string connector_name_;
std::weak_ptr<Connector> weak_queue_;
System *system_;
};
/**
@ -321,7 +317,6 @@ private:
* Warning: do not forget to initialize self_ptr_ individually. Private because it shouldn't be created outside of a Reactor.
*/
struct Params {
System* system;
std::string reactor_name;
std::string connector_name;
std::shared_ptr<std::mutex> mutex;
@ -337,7 +332,7 @@ private:
std::shared_ptr<LocalChannel> LockedOpenChannel() {
assert(!self_ptr_.expired()); // TODO(zuza): fix this using this answer https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
return std::make_shared<LocalChannel>(mutex_, reactor_name_, connector_name_, self_ptr_, system_);
return std::make_shared<LocalChannel>(mutex_, reactor_name_, connector_name_, self_ptr_);
}
std::unique_ptr<Message> LockedAwaitPop(std::unique_lock<std::mutex> &lock) {
@ -373,7 +368,6 @@ private:
assert(num_erased == 1);
}
System *system_;
std::string connector_name_;
std::string reactor_name_;
std::queue<std::unique_ptr<Message>> queue_;
@ -681,6 +675,8 @@ class Message {
}
};
class Distributed;
/**
* Message that includes the sender channel used to respond.
*/
@ -694,7 +690,8 @@ class SenderMessage : public Message {
std::string ReactorName() const;
std::string ChannelName() const;
std::shared_ptr<Channel> GetChannelToSender(System *system) const;
std::shared_ptr<Channel> GetChannelToSender(System *system,
Distributed *distributed = nullptr) const;
template <class Archive>
void serialize(Archive &ar) {
@ -716,19 +713,15 @@ CEREAL_REGISTER_TYPE(SenderMessage);
* E.g. holds set of reactors, channels for all reactors.
*/
class System {
using Location = std::pair<std::string, uint16_t>;
public:
friend class Reactor;
System() : network_(this) {}
System() {}
void operator=(const System &) = delete;
void StartServices() {
network_.StartClient(4);
network_.StartServer(4);
}
System(const System &) = delete;
System(System &&) = delete;
System &operator=(const System &) = delete;
System &operator=(System &&) = delete;
template <class ReactorType, class... Args>
const std::shared_ptr<Channel> Spawn(const std::string &name,
@ -746,7 +739,7 @@ class System {
return nullptr;
}
const std::shared_ptr<Channel> FindLocalChannel(const std::string &reactor_name,
const std::shared_ptr<Channel> FindChannel(const std::string &reactor_name,
const std::string &channel_name) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
auto it_reactor = reactors_.find(reactor_name);
@ -754,36 +747,13 @@ class System {
return it_reactor->second.first->FindChannel(channel_name);
}
/** Register memgraph node's id to the given location. */
void RegisterMemgraphNode(int64_t id, const std::string& address, uint16_t port) {
mnodes_[id] = Location(address, port);
}
/** Finds channel using memgrpah node's id. */
const std::shared_ptr<Channel> FindChannel(int64_t mnid,
const std::string &reactor_name,
const std::string &channel_name) {
const auto& location = mnodes_.at(mnid);
if (network().Address() == location.first &&
network().Port() == location.second)
return FindLocalChannel(reactor_name, channel_name);
return network().Resolve(location.first, location.second, reactor_name,
channel_name);
}
const auto& Processes() { return mnodes_; }
void AwaitShutdown() {
for (auto &key_value : reactors_) {
auto &thread = key_value.second.second;
thread.join();
}
network_.StopClient();
network_.StopServer();
}
Network &network() { return network_; }
private:
void StartReactor(Reactor& reactor) {
current_reactor_ = &reactor;
@ -797,6 +767,83 @@ class System {
std::unordered_map<std::string,
std::pair<std::unique_ptr<Reactor>, std::thread>>
reactors_;
std::unordered_map<int64_t, Location> mnodes_;
};
/**
* Message that will arrive on a stream returned by Distributed::FindChannel
* once and if the channel is successfully resolved.
*/
class ChannelResolvedMessage : public Message {
public:
ChannelResolvedMessage() {}
ChannelResolvedMessage(std::shared_ptr<Channel> channel)
: Message(), channel_(channel) {}
std::shared_ptr<Channel> channel() const { return channel_; }
private:
std::shared_ptr<Channel> channel_;
};
/**
* Placeholder for all functionality related to non-local communication
* E.g. resolve remote channels by memgraph node id, etc.
*/
class Distributed {
public:
Distributed(System &system) : system_(system), network_(&system) {}
Distributed(const Distributed &) = delete;
Distributed(Distributed &&) = delete;
Distributed &operator=(const Distributed &) = delete;
Distributed &operator=(Distributed &&) = delete;
void StartServices() {
network_.StartClient(4);
network_.StartServer(4);
}
void StopServices() {
network_.StopClient();
network_.StopServer();
}
// TODO: Implement remote Spawn.
/**
* Resolves remote channel.
*
* TODO: Provide asynchronous implementation of this function.
*
* @return EventStream on which message will arrive once channel is resolved.
* @warning It can only be called from local Reactor.
*/
EventStream* FindChannel(const std::string &address,
uint16_t port,
const std::string &reactor_name,
const std::string &channel_name) {
std::shared_ptr<Channel> channel = nullptr;
while (!(channel = network_.Resolve(address, port, reactor_name, channel_name)))
std::this_thread::sleep_for(std::chrono::milliseconds(200));
auto stream_channel = current_reactor_->Open();
stream_channel.second->Send<ChannelResolvedMessage>(channel);
return stream_channel.first;
}
System &system() { return system_; }
Network &network() { return network_; }
protected:
System &system_;
Network network_;
};
class DistributedReactor : public Reactor {
public:
DistributedReactor(System *system, std::string name, Distributed &distributed)
: Reactor(system, name), distributed_(distributed) {}
protected:
Distributed &distributed_;
};

View File

@ -37,7 +37,7 @@ void Session::Execute() {
std::cout << "Reactor: " << reactor_ << "; Channel: " << channel_
<< std::endl;
auto channel = system_->FindLocalChannel(reactor_, channel_);
auto channel = system_->FindChannel(reactor_, channel_);
SendSuccess(channel != nullptr);
handshake_done_ = true;
@ -58,7 +58,7 @@ void Session::Execute() {
iarchive(message);
buffer_.Shift(len_data);
auto channel = system_->FindLocalChannel(reactor_, channel_);
auto channel = system_->FindChannel(reactor_, channel_);
if (channel == nullptr) {
SendSuccess(false);
return;

View File

@ -20,7 +20,6 @@ TEST(SystemTest, ReturnWithoutThrowing) {
};
System system;
ASSERT_NO_THROW(system.StartServices());
ASSERT_NO_THROW(system.Spawn<Master>("master"));
ASSERT_NO_THROW(system.AwaitShutdown());
}
@ -48,7 +47,7 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindLocalChannel("worker", "main")))
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
CloseConnector("main");
@ -59,7 +58,7 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) {
Worker(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindLocalChannel("master", "main")))
while (!(channel = system_->FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
CloseConnector("main");
@ -83,7 +82,7 @@ TEST(SimpleSendTest, OneSimpleSend) {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindLocalChannel("worker", "main")))
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(123);
CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII.
@ -118,7 +117,7 @@ TEST(SimpleSendTest, OneCallback) {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindLocalChannel("worker", "main")))
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(888);
CloseConnector("main");
@ -154,7 +153,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindLocalChannel("worker", "main")))
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(101);
channel->Send<MessageInt>(102); // should be ignored
@ -236,7 +235,7 @@ TEST(MultipleSendTest, UnsubscribeService) {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindLocalChannel("worker", "main")))
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(55);
channel->Send<MessageInt>(66);
@ -299,7 +298,7 @@ TEST(MultipleSendTest, OnEvent) {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindLocalChannel("worker", "main")))
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(101);
@ -357,7 +356,7 @@ TEST(MultipleSendTest, Chaining) {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindLocalChannel("worker", "main")))
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(55);
channel->Send<MessageInt>(66);
@ -408,7 +407,7 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindLocalChannel("worker", "main")))
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageChar>('a');
channel->Send<MessageInt>(55);
@ -457,7 +456,7 @@ TEST(MultipleSendTest, ProcessManyMessages) {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindLocalChannel("worker", "main")))
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));

View File

@ -1,123 +0,0 @@
#include <iostream>
#include <fstream>
#include "communication.hpp"
DEFINE_int64(my_mnid, 0, "Memgraph node id");
DEFINE_string(config_filename, "", "File containing list of all processes");
/**
* About config file
*
* Each line contains three strings:
* memgraph node id, ip address of the worker, and port of the worker
* Data on the first line is used to start master.
* Data on the remaining lines is used to start workers.
*/
/**
* Parse config file and register processes into system.
*
* @return Pair (master mnid, list of worker's id).
*/
std::pair<int64_t, std::vector<int64_t>>
ParseConfigAndRegister(const std::string& filename, System& system) {
std::ifstream file(filename, std::ifstream::in);
assert(file.good());
int64_t master_mnid;
std::vector<int64_t> worker_mnids;
int64_t mnid;
std::string address;
uint16_t port;
file >> master_mnid >> address >> port;
system.RegisterMemgraphNode(master_mnid, address, port);
while (!(file >> mnid).eof()) {
file >> address >> port;
system.RegisterMemgraphNode(mnid, address, port);
worker_mnids.push_back(mnid);
}
file.close();
return std::make_pair(master_mnid, worker_mnids);
}
class Master : public Reactor {
public:
Master(System* system, std::string name, int64_t mnid,
std::vector<int64_t>&& worker_mnids)
: Reactor(system, name), mnid_(mnid), worker_mnids_(std::move(worker_mnids)) {}
virtual void Run() {
std::cout << "Master (" << mnid_ << ") @ " << system_->network().Address()
<< ":" << system_->network().Port() << std::endl;
auto stream = main_.first;
stream->OnEvent<SenderMessage>([this](const SenderMessage &msg,
const EventStream::Subscription& subscription) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n";
++workers_seen;
if (workers_seen == worker_mnids_.size()) {
subscription.unsubscribe();
std::this_thread::sleep_for(std::chrono::seconds(4));
CloseConnector("main");
}
});
for (auto wmnid : worker_mnids_) {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel(wmnid, "worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(200));
channel->Send<SenderMessage>("master", "main");
}
}
protected:
int64_t workers_seen = 0;
const int64_t mnid_;
std::vector<int64_t> worker_mnids_;
};
class Worker : public Reactor {
public:
Worker(System* system, std::string name, int64_t mnid, int64_t master_mnid)
: Reactor(system, name), mnid_(mnid), master_mnid_(master_mnid) {}
virtual void Run() {
std::cout << "Worker (" << mnid_ << ") @ " << system_->network().Address()
<< ":" << system_->network().Port() << std::endl;
auto stream = main_.first;
stream->OnEvent<SenderMessage>([this](const SenderMessage &msg,
const EventStream::Subscription &subscription) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n";
subscription.unsubscribe();
std::this_thread::sleep_for(std::chrono::seconds(4));
CloseConnector("main");
});
std::shared_ptr<Channel> channel;
while(!(channel = system_->FindChannel(master_mnid_, "master", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(200));
channel->Send<SenderMessage>("worker", "main");
}
protected:
const int64_t mnid_;
const int64_t master_mnid_;
};
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
System system;
auto mnids = ParseConfigAndRegister(FLAGS_config_filename, system);
system.StartServices();
if (FLAGS_my_mnid == mnids.first)
system.Spawn<Master>("master", FLAGS_my_mnid, std::move(mnids.second));
else
system.Spawn<Worker>("worker", FLAGS_my_mnid, mnids.first);
system.AwaitShutdown();
return 0;
}

View File

@ -0,0 +1,175 @@
#include <iostream>
#include <fstream>
#include "communication.hpp"
DEFINE_int64(my_mnid, 0, "Memgraph node id");
DEFINE_string(config_filename, "", "File containing list of all processes");
class MemgraphDistributed : public Distributed {
private:
using Location = std::pair<std::string, uint16_t>;
public:
MemgraphDistributed(System &system) : Distributed(system) {}
/** Register memgraph node id to the given location. */
void RegisterMemgraphNode(int64_t mnid, const std::string& address, uint16_t port) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
mnodes_[mnid] = Location(address, port);
}
EventStream* FindChannel(int64_t mnid,
const std::string &reactor,
const std::string &channel) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
const auto& location = mnodes_.at(mnid);
return Distributed::FindChannel(location.first, location.second, reactor, channel);
}
private:
std::recursive_mutex mutex_;
std::unordered_map<int64_t, Location> mnodes_;
};
/**
* About config file
*
* Each line contains three strings:
* memgraph node id, ip address of the worker, and port of the worker
* Data on the first line is used to start master.
* Data on the remaining lines is used to start workers.
*/
/**
* Parse config file and register processes into system.
*
* @return Pair (master mnid, list of worker's id).
*/
std::pair<int64_t, std::vector<int64_t>>
ParseConfigAndRegister(const std::string& filename,
MemgraphDistributed& distributed) {
std::ifstream file(filename, std::ifstream::in);
assert(file.good());
int64_t master_mnid;
std::vector<int64_t> worker_mnids;
int64_t mnid;
std::string address;
uint16_t port;
file >> master_mnid >> address >> port;
distributed.RegisterMemgraphNode(master_mnid, address, port);
while (file.good()) {
file >> mnid >> address >> port;
if (file.eof())
break ;
distributed.RegisterMemgraphNode(mnid, address, port);
worker_mnids.push_back(mnid);
}
file.close();
return std::make_pair(master_mnid, worker_mnids);
}
class MemgraphReactor : public Reactor {
public:
MemgraphReactor(System* system, std::string name,
MemgraphDistributed &distributed)
: Reactor(system, name), distributed_(distributed) {}
protected:
MemgraphDistributed &distributed_;
};
class Master : public MemgraphReactor {
public:
Master(System* system, std::string name, MemgraphDistributed &distributed,
int64_t mnid, std::vector<int64_t>&& worker_mnids)
: MemgraphReactor(system, name, distributed), mnid_(mnid),
worker_mnids_(std::move(worker_mnids)) {}
virtual void Run() {
std::cout << "Master (" << mnid_ << ") @ " << distributed_.network().Address()
<< ":" << distributed_.network().Port() << std::endl;
auto stream = main_.first;
stream->OnEvent<SenderMessage>([this](const SenderMessage &msg,
const EventStream::Subscription& subscription) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n";
++workers_seen;
if (workers_seen == worker_mnids_.size()) {
subscription.unsubscribe();
// Sleep for a while so we can read output in the terminal.
// (start_distributed.py runs each process in a new tab which is
// closed immediately after process has finished)
std::this_thread::sleep_for(std::chrono::seconds(4));
CloseConnector("main");
}
});
for (auto wmnid : worker_mnids_) {
auto stream = distributed_.FindChannel(wmnid, "worker", "main");
stream->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg){
msg.channel()->Send<SenderMessage>("master", "main");
stream->Close();
});
}
}
protected:
int64_t workers_seen = 0;
const int64_t mnid_;
std::vector<int64_t> worker_mnids_;
};
class Worker : public MemgraphReactor {
public:
Worker(System* system, std::string name, MemgraphDistributed &distributed,
int64_t mnid, int64_t master_mnid)
: MemgraphReactor(system, name, distributed), mnid_(mnid),
master_mnid_(master_mnid) {}
virtual void Run() {
std::cout << "Worker (" << mnid_ << ") @ " << distributed_.network().Address()
<< ":" << distributed_.network().Port() << std::endl;
auto stream = main_.first;
stream->OnEventOnce()
.ChainOnce<SenderMessage>([this](const SenderMessage &msg) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n";
// Sleep for a while so we can read output in the terminal.
std::this_thread::sleep_for(std::chrono::seconds(4));
CloseConnector("main");
});
auto remote_stream = distributed_.FindChannel(master_mnid_, "master", "main");
remote_stream->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this, remote_stream](const ChannelResolvedMessage &msg){
msg.channel()->Send<SenderMessage>("worker", "main");
remote_stream->Close();
});
}
protected:
const int64_t mnid_;
const int64_t master_mnid_;
};
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
System system;
MemgraphDistributed distributed(system);
auto mnids = ParseConfigAndRegister(FLAGS_config_filename, distributed);
distributed.StartServices();
if (FLAGS_my_mnid == mnids.first)
system.Spawn<Master>("master", distributed, FLAGS_my_mnid, std::move(mnids.second));
else
system.Spawn<Worker>("worker", distributed, FLAGS_my_mnid, mnids.first);
system.AwaitShutdown();
distributed.StopServices();
return 0;
}

View File

@ -36,9 +36,10 @@ class ChatACK : public ChatMessage {
};
CEREAL_REGISTER_TYPE(ChatACK);
class ChatServer : public Reactor {
class ChatServer : public DistributedReactor {
public:
ChatServer(System *system, std::string name) : Reactor(system, name) {}
ChatServer(System *system, std::string name, Distributed &distributed)
: DistributedReactor(system, name, distributed) {}
virtual void Run() {
std::cout << "ChatServer is active" << std::endl;
@ -63,9 +64,10 @@ class ChatServer : public Reactor {
}
};
class ChatClient : public Reactor {
class ChatClient : public DistributedReactor {
public:
ChatClient(System *system, std::string name) : Reactor(system, name) {}
ChatClient(System *system, std::string name, Distributed &distributed)
: DistributedReactor(system, name, distributed) {}
virtual void Run() {
std::cout << "ChatClient is active" << std::endl;
@ -77,7 +79,7 @@ class ChatClient : public Reactor {
std::cin >> address >> port >> message;
auto channel =
system_->network().Resolve(address, port, "server", "chat");
distributed_.network().Resolve(address, port, "server", "chat");
if (channel != nullptr) {
channel->Send<ChatMessage>("server", "chat", message);
} else {
@ -90,9 +92,11 @@ class ChatClient : public Reactor {
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
System system;
system.StartServices();
system.Spawn<ChatServer>("server");
system.Spawn<ChatClient>("client");
Distributed distributed(system);
distributed.StartServices();
system.Spawn<ChatServer>("server", distributed);
system.Spawn<ChatClient>("client", distributed);
system.AwaitShutdown();
distributed.StopServices();
return 0;
}

View File

@ -3,12 +3,13 @@
int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
System system;
system.network().StartClient(1);
auto channel = system.network().Resolve("127.0.0.1", 10000, "master", "main");
Distributed distributed(system);
distributed.network().StartClient(1);
auto channel = distributed.network().Resolve("127.0.0.1", 10000, "master", "main");
std::cout << channel << std::endl;
if (channel != nullptr) {
channel->Send<SenderMessage>("master", "main");
}
system.network().StopClient();
distributed.network().StopClient();
return 0;
}

View File

@ -4,7 +4,7 @@
import os
command = 'gnome-terminal'
program = './distributed'
program = './distributed_test'
config_filename = 'config'
flags = ' --minloglevel 2'