Example of starting simple distributed system
Reviewers: zuza, buda, lion Reviewed By: zuza Subscribers: lion, pullbot Differential Revision: https://phabricator.memgraph.io/D663
This commit is contained in:
parent
9e85ccc892
commit
23652fee15
@ -754,16 +754,16 @@ class System {
|
||||
return it_reactor->second.first->FindChannel(channel_name);
|
||||
}
|
||||
|
||||
/** Register process id to the given location. */
|
||||
void RegisterProcess(int64_t id, const std::string& address, uint16_t port) {
|
||||
processes_[id] = Location(address, port);
|
||||
/** Register memgraph node's id to the given location. */
|
||||
void RegisterMemgraphNode(int64_t id, const std::string& address, uint16_t port) {
|
||||
mnodes_[id] = Location(address, port);
|
||||
}
|
||||
|
||||
/** Finds channel using process's id. */
|
||||
const std::shared_ptr<Channel> FindChannel(int64_t process_id,
|
||||
/** Finds channel using memgrpah node's id. */
|
||||
const std::shared_ptr<Channel> FindChannel(int64_t mnid,
|
||||
const std::string &reactor_name,
|
||||
const std::string &channel_name) {
|
||||
const auto& location = processes_.at(process_id);
|
||||
const auto& location = mnodes_.at(mnid);
|
||||
if (network().Address() == location.first &&
|
||||
network().Port() == location.second)
|
||||
return FindLocalChannel(reactor_name, channel_name);
|
||||
@ -771,7 +771,7 @@ class System {
|
||||
channel_name);
|
||||
}
|
||||
|
||||
const auto& Processes() { return processes_; }
|
||||
const auto& Processes() { return mnodes_; }
|
||||
|
||||
void AwaitShutdown() {
|
||||
for (auto &key_value : reactors_) {
|
||||
@ -797,6 +797,6 @@ class System {
|
||||
std::unordered_map<std::string,
|
||||
std::pair<std::unique_ptr<Reactor>, std::thread>>
|
||||
reactors_;
|
||||
std::unordered_map<int64_t, Location> processes_;
|
||||
std::unordered_map<int64_t, Location> mnodes_;
|
||||
Network network_;
|
||||
};
|
||||
|
3
experimental/distributed/tests/config
Normal file
3
experimental/distributed/tests/config
Normal file
@ -0,0 +1,3 @@
|
||||
0 127.0.0.1 10000
|
||||
2 127.0.0.1 10001
|
||||
3 127.0.0.1 10002
|
123
experimental/distributed/tests/distributed.cpp
Normal file
123
experimental/distributed/tests/distributed.cpp
Normal file
@ -0,0 +1,123 @@
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
|
||||
#include "communication.hpp"
|
||||
|
||||
DEFINE_int64(my_mnid, 0, "Memgraph node id");
|
||||
DEFINE_string(config_filename, "", "File containing list of all processes");
|
||||
|
||||
/**
|
||||
* About config file
|
||||
*
|
||||
* Each line contains three strings:
|
||||
* memgraph node id, ip address of the worker, and port of the worker
|
||||
* Data on the first line is used to start master.
|
||||
* Data on the remaining lines is used to start workers.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Parse config file and register processes into system.
|
||||
*
|
||||
* @return Pair (master mnid, list of worker's id).
|
||||
*/
|
||||
std::pair<int64_t, std::vector<int64_t>>
|
||||
ParseConfigAndRegister(const std::string& filename, System& system) {
|
||||
std::ifstream file(filename, std::ifstream::in);
|
||||
assert(file.good());
|
||||
int64_t master_mnid;
|
||||
std::vector<int64_t> worker_mnids;
|
||||
int64_t mnid;
|
||||
std::string address;
|
||||
uint16_t port;
|
||||
file >> master_mnid >> address >> port;
|
||||
system.RegisterMemgraphNode(master_mnid, address, port);
|
||||
while (!(file >> mnid).eof()) {
|
||||
file >> address >> port;
|
||||
system.RegisterMemgraphNode(mnid, address, port);
|
||||
worker_mnids.push_back(mnid);
|
||||
}
|
||||
file.close();
|
||||
return std::make_pair(master_mnid, worker_mnids);
|
||||
}
|
||||
|
||||
|
||||
class Master : public Reactor {
|
||||
public:
|
||||
Master(System* system, std::string name, int64_t mnid,
|
||||
std::vector<int64_t>&& worker_mnids)
|
||||
: Reactor(system, name), mnid_(mnid), worker_mnids_(std::move(worker_mnids)) {}
|
||||
|
||||
virtual void Run() {
|
||||
std::cout << "Master (" << mnid_ << ") @ " << system_->network().Address()
|
||||
<< ":" << system_->network().Port() << std::endl;
|
||||
|
||||
auto stream = main_.first;
|
||||
stream->OnEvent<SenderMessage>([this](const SenderMessage &msg,
|
||||
const EventStream::Subscription& subscription) {
|
||||
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n";
|
||||
++workers_seen;
|
||||
if (workers_seen == worker_mnids_.size()) {
|
||||
subscription.unsubscribe();
|
||||
std::this_thread::sleep_for(std::chrono::seconds(4));
|
||||
CloseConnector("main");
|
||||
}
|
||||
});
|
||||
|
||||
for (auto wmnid : worker_mnids_) {
|
||||
std::shared_ptr<Channel> channel;
|
||||
while (!(channel = system_->FindChannel(wmnid, "worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
channel->Send<SenderMessage>("master", "main");
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
int64_t workers_seen = 0;
|
||||
const int64_t mnid_;
|
||||
std::vector<int64_t> worker_mnids_;
|
||||
};
|
||||
|
||||
class Worker : public Reactor {
|
||||
public:
|
||||
Worker(System* system, std::string name, int64_t mnid, int64_t master_mnid)
|
||||
: Reactor(system, name), mnid_(mnid), master_mnid_(master_mnid) {}
|
||||
|
||||
virtual void Run() {
|
||||
std::cout << "Worker (" << mnid_ << ") @ " << system_->network().Address()
|
||||
<< ":" << system_->network().Port() << std::endl;
|
||||
|
||||
auto stream = main_.first;
|
||||
stream->OnEvent<SenderMessage>([this](const SenderMessage &msg,
|
||||
const EventStream::Subscription &subscription) {
|
||||
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n";
|
||||
subscription.unsubscribe();
|
||||
std::this_thread::sleep_for(std::chrono::seconds(4));
|
||||
CloseConnector("main");
|
||||
});
|
||||
|
||||
std::shared_ptr<Channel> channel;
|
||||
while(!(channel = system_->FindChannel(master_mnid_, "master", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
channel->Send<SenderMessage>("worker", "main");
|
||||
}
|
||||
|
||||
protected:
|
||||
const int64_t mnid_;
|
||||
const int64_t master_mnid_;
|
||||
};
|
||||
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
|
||||
System system;
|
||||
auto mnids = ParseConfigAndRegister(FLAGS_config_filename, system);
|
||||
system.StartServices();
|
||||
if (FLAGS_my_mnid == mnids.first)
|
||||
system.Spawn<Master>("master", FLAGS_my_mnid, std::move(mnids.second));
|
||||
else
|
||||
system.Spawn<Worker>("worker", FLAGS_my_mnid, mnids.first);
|
||||
system.AwaitShutdown();
|
||||
|
||||
return 0;
|
||||
}
|
21
experimental/distributed/tests/start_distributed.py
Executable file
21
experimental/distributed/tests/start_distributed.py
Executable file
@ -0,0 +1,21 @@
|
||||
# Unfortunately I don't know how to force CMake to copy this script to
|
||||
# the test folder so for now you will have to do it yourself.
|
||||
|
||||
import os
|
||||
|
||||
command = 'gnome-terminal'
|
||||
program = './distributed'
|
||||
config_filename = 'config'
|
||||
flags = ' --minloglevel 2'
|
||||
|
||||
f = open(config_filename, 'r')
|
||||
for line in f:
|
||||
data = line.strip().split(' ')
|
||||
my_mnid = data[0]
|
||||
address = data[1]
|
||||
port = data[2]
|
||||
call = program + flags + ' --my_mnid ' + my_mnid + ' --address ' + address +\
|
||||
' --port ' + port + ' --config_filename ' + config_filename
|
||||
command += " --tab -e '" + call + "'"
|
||||
|
||||
os.system(command)
|
Loading…
Reference in New Issue
Block a user