Make System a singleton + Fix some protocol errors

Reviewers: mferencevic, sasa.stanko

Reviewed By: mferencevic, sasa.stanko

Differential Revision: https://phabricator.memgraph.io/D693
This commit is contained in:
Goran Zuzic 2017-08-22 16:29:23 +02:00
parent 1a619c54e9
commit 8f33269b03
15 changed files with 624 additions and 568 deletions

View File

@ -28,11 +28,20 @@ file(GLOB_RECURSE src_files ${src_dir}/*.cpp)
add_library(distributed_lib STATIC ${src_files})
## executable
set(executable_name distributed)
add_executable(${executable_name} ${PROJECT_SOURCE_DIR}/main.cpp)
target_link_libraries(${executable_name} distributed_lib)
target_link_libraries(${executable_name} memgraph_lib)
target_link_libraries(${executable_name} ${MEMGRAPH_ALL_LIBS})
#### HACK: there is temporarily no working main file as the API is changing
# set(executable_name distributed)
# add_executable(${executable_name} ${PROJECT_SOURCE_DIR}/main.cpp)
# target_link_libraries(${executable_name} distributed_lib)
# target_link_libraries(${executable_name} memgraph_lib)
# target_link_libraries(${executable_name} ${MEMGRAPH_ALL_LIBS})
# tests
add_subdirectory(${PROJECT_SOURCE_DIR}/tests)
# copy test scripts into the build/ directory
configure_file(${PROJECT_SOURCE_DIR}/tests/start_distributed.py
${PROJECT_BINARY_DIR}/tests/start_distributed.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/config
${PROJECT_BINARY_DIR}/tests/config COPYONLY)

View File

@ -9,7 +9,7 @@ This subdirectory structure implements distributed infrastructure of Memgraph.
* Node: a computer that performs (distributed) work.
* Vertex: an abstract graph concept.
* Reactor: a unit of concurrent execution, lives on its own thread.
* Connector: a communication abstraction between Reactors. The reactors can be on the same machine or on different processes.
* Connector: a (one-way) communication abstraction between Reactors. The reactors can be on the same machine or on different processes.
* EventStream: read-end of a connector, is owned by exactly one Reactor/thread.
* Channel: write-end of a connector, can be owned (wrote into) by multiple threads.

View File

@ -1,387 +0,0 @@
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include "reactors_distributed.hpp"
const int NUM_WORKERS = 1;
class Txn : public SenderMessage {
public:
Txn(std::string reactor, std::string channel, int64_t id) : SenderMessage(reactor, channel), id_(id) {}
int64_t id() const { return id_; }
template <class Archive>
void serialize(Archive &archive) {
archive(cereal::base_class<SenderMessage>(this), id_);
}
private:
int64_t id_;
};
class CreateNodeTxn : public Txn {
public:
CreateNodeTxn(std::string reactor, std::string channel, int64_t id) : Txn(reactor, channel, id) {}
template <class Archive>
void serialize(Archive &archive) {
archive(cereal::base_class<Txn>(this));
}
};
class CountNodesTxn : public Txn {
public:
CountNodesTxn(std::string reactor, std::string channel, int64_t id) : Txn(reactor, channel, id) {}
template <class Archive>
void serialize(Archive &archive) {
archive(cereal::base_class<Txn>(this));
}
};
class CountNodesTxnResult : public Message {
public:
CountNodesTxnResult(int64_t count) : count_(count) {}
int64_t count() const { return count_; }
template <class Archive>
void serialize(Archive &archive) {
archive(count_);
}
private:
int64_t count_;
};
class CommitRequest : public SenderMessage {
public:
CommitRequest(std::string reactor, std::string channel, int64_t worker_id)
: SenderMessage(reactor, channel), worker_id_(worker_id) {}
int64_t worker_id() { return worker_id_; }
template <class Archive>
void serialize(Archive &archive) {
archive(cereal::base_class<SenderMessage>(this), worker_id_);
}
private:
int64_t worker_id_;
};
class AbortRequest : public SenderMessage {
public:
AbortRequest(std::string reactor, std::string channel, int64_t worker_id)
: SenderMessage(reactor, channel), worker_id_(worker_id) {}
int64_t worker_id() { return worker_id_; }
template <class Archive>
void serialize(Archive &archive) {
archive(cereal::base_class<SenderMessage>(this), worker_id_);
}
private:
int64_t worker_id_;
};
class CommitDirective : public Message {
template <class Archive>
void serialize(Archive &archive) {
archive(cereal::base_class<Message>(this));
}
};
class AbortDirective : public Message {
template <class Archive>
void serialize(Archive &archive) {
archive(cereal::base_class<Message>(this));
}
};
class Query : public Message {
public:
Query(std::string query) : Message(), query_(query) {}
std::string query() const { return query_; }
template <class Archive>
void serialize(Archive &archive) {
archive(cereal::base_class<Message>(this), query_);
}
private:
std::string query_;
};
class Quit : public Message {
template <class Archive>
void serialize(Archive &archive) {
archive(cereal::base_class<Message>(this));
}
};
class Master : public Reactor {
public:
Master(System *system, std::string name) : Reactor(system, name), next_xid_(1) {}
virtual void Run() {
auto stream = main_.first;
FindWorkers();
std::cout << "Master is active" << std::endl;
while (true) {
auto m = stream->AwaitEvent();
if (Query *query = dynamic_cast<Query *>(m.get())) {
ProcessQuery(query);
break; // process only the first query
} else if (SenderMessage *msg = dynamic_cast<SenderMessage *>(m.get())) {
std::cout << "SenderMessage received!" << std::endl;
std::cout << " Address: " << msg->Address() << std::endl;
std::cout << " Port: " << msg->Port() << std::endl;
std::cout << " Reactor: " << msg->ReactorName() << std::endl;
std::cout << " Channel: " << msg->ChannelName() << std::endl;
} else {
std::cerr << "unknown message\n";
exit(1);
}
}
stream->OnEvent<Message>([this](const Message &msg, const EventStream::Subscription& subscription) {
std::cout << "Processing Query via Callback" << std::endl;
const Query &query =
dynamic_cast<const Query &>(msg); // exception bad_cast
ProcessQuery(&query);
subscription.unsubscribe();
});
}
private:
void ProcessQuery(const Query *query) {
if (query->query() == "create node") {
PerformCreateNode();
} else if (query->query() == "count nodes") {
PerformCountNodes();
} else {
std::cout << "got query: " << query->query() << std::endl;
}
}
void PerformCreateNode() {
int worker_id = rand() % NUM_WORKERS;
int64_t xid = GetTransactionId();
std::string txn_channel_name = GetTxnName(xid);
auto connector = Open(txn_channel_name);
auto stream = connector.first;
channels_[worker_id]->Send<CreateNodeTxn>("master", "main", xid);
auto m = stream->AwaitEvent();
if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
req->GetChannelToSender(system_)->Send<CommitDirective>();
} else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) {
req->GetChannelToSender(system_)->Send<AbortDirective>();
} else {
std::cerr << "unknown message\n";
exit(1);
}
CloseConnector(txn_channel_name);
}
void PerformCountNodes() {
int64_t xid = GetTransactionId();
std::string txn_channel_name = GetTxnName(xid);
auto connector = Open(txn_channel_name);
auto stream = connector.first;
for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
channels_[w_id]->Send<CountNodesTxn>("master", "main", xid);
std::vector<std::shared_ptr<Channel>> txn_channels;
txn_channels.resize(NUM_WORKERS, nullptr);
bool commit = true;
for (int responds = 0; responds < NUM_WORKERS; ++responds) {
auto m = stream->AwaitEvent();
if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
txn_channels[req->worker_id()] = req->GetChannelToSender(system_);
commit &= true;
} else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) {
txn_channels[req->worker_id()] = req->GetChannelToSender(system_);
commit = false;
} else {
std::cerr << "unknown message\n";
exit(1);
}
}
if (commit) {
for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
txn_channels[w_id]->Send<CommitDirective>();
} else {
for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
txn_channels[w_id]->Send<AbortDirective>();
}
int64_t count = 0;
for (int responds = 0; responds < NUM_WORKERS; ++responds) {
auto m = stream->AwaitEvent();
if (CountNodesTxnResult *cnt =
dynamic_cast<CountNodesTxnResult *>(m.get())) {
count += cnt->count();
} else {
std::cerr << "unknown message\n";
exit(1);
}
}
CloseConnector(txn_channel_name);
std::cout << "graph has " << count << " vertices" << std::endl;
}
int64_t GetTransactionId() { return next_xid_++; }
std::string GetWorkerName(int worker_id) {
return "worker" + std::to_string(worker_id);
}
std::string GetTxnName(int txn_id) { return "txn" + std::to_string(txn_id); }
void FindWorkers() {
channels_.resize(NUM_WORKERS, nullptr);
int workers_found = 0;
while (workers_found < NUM_WORKERS) {
for (int64_t w_id = 0; w_id < NUM_WORKERS; ++w_id) {
if (channels_[w_id] == nullptr) {
// TODO: Resolve worker channel using the network service.
channels_[w_id] = system_->FindChannel(GetWorkerName(w_id), "main");
if (channels_[w_id] != nullptr) ++workers_found;
}
}
if (workers_found < NUM_WORKERS)
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
// TODO: Why is master atomic, it should be unique?
std::atomic<int64_t> next_xid_;
std::vector<std::shared_ptr<Channel>> channels_;
};
class Worker : public Reactor {
public:
Worker(System *system, std::string name, int64_t id) : Reactor(system, name),
worker_id_(id) {}
virtual void Run() {
std::cout << "worker " << worker_id_ << " is active" << std::endl;
auto stream = main_.first;
FindMaster();
while (true) {
auto m = stream->AwaitEvent();
if (Txn *txn = dynamic_cast<Txn *>(m.get())) {
HandleTransaction(txn);
} else {
std::cerr << "unknown message\n";
exit(1);
}
}
}
private:
void HandleTransaction(Txn *txn) {
if (CreateNodeTxn *create_txn = dynamic_cast<CreateNodeTxn *>(txn)) {
HandleCreateNode(create_txn);
} else if (CountNodesTxn *cnt_txn = dynamic_cast<CountNodesTxn *>(txn)) {
HandleCountNodes(cnt_txn);
} else {
std::cerr << "unknown transaction\n";
exit(1);
}
}
void HandleCreateNode(CreateNodeTxn *txn) {
auto connector = Open(GetTxnChannelName(txn->id()));
auto stream = connector.first;
auto masterChannel = txn->GetChannelToSender(system_);
// TODO: Do the actual commit.
masterChannel->Send<CommitRequest>("master", "main", worker_id_);
auto m = stream->AwaitEvent();
if (dynamic_cast<CommitDirective *>(m.get())) {
// TODO: storage_.CreateNode();
} else if (dynamic_cast<AbortDirective *>(m.get())) {
// TODO: Rollback.
} else {
std::cerr << "unknown message\n";
exit(1);
}
CloseConnector(GetTxnChannelName(txn->id()));
}
void HandleCountNodes(CountNodesTxn *txn) {
auto connector = Open(GetTxnChannelName(txn->id()));
auto stream = connector.first;
auto masterChannel = txn->GetChannelToSender(system_);
// TODO: Fix this hack -- use the storage.
int num = 123;
masterChannel->Send<CommitRequest>("master", "main", worker_id_);
auto m = stream->AwaitEvent();
if (dynamic_cast<CommitDirective *>(m.get())) {
masterChannel->Send<CountNodesTxnResult>(num);
} else if (dynamic_cast<AbortDirective *>(m.get())) {
// send nothing
} else {
std::cerr << "unknown message\n";
exit(1);
}
CloseConnector(GetTxnChannelName(txn->id()));
}
// TODO: Don't repeat code from Master.
std::string GetTxnChannelName(int64_t transaction_id) {
return "txn" + std::to_string(transaction_id);
}
void FindMaster() {
// TODO: Replace with network service and channel resolution.
while (!(master_channel_ = system_->FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::shared_ptr<Channel> master_channel_ = nullptr;
int worker_id_;
};
void ClientMain(System *system) {
std::shared_ptr<Channel> channel = nullptr;
// TODO: Replace this with network channel resolution.
while (!(channel = system->FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "I/O Client Main active" << std::endl;
bool active = true;
while (active) {
std::string s;
std::getline(std::cin, s);
if (s == "quit") {
active = false;
channel->Send<Quit>();
} else {
channel->Send<Query>(s);
}
}
}
int main(int argc, char *argv[]) {
//google::InitGoogleLogging(argv[0]);
gflags::ParseCommandLineFlags(&argc, &argv, true);
System system;
system.Spawn<Master>("master");
std::thread client(ClientMain, &system);
for (int i = 0; i < NUM_WORKERS; ++i)
system.Spawn<Worker>("worker" + std::to_string(i), i);
system.AwaitShutdown();
return 0;
}

View File

@ -7,14 +7,14 @@
namespace Protocol {
Session::Session(Socket &&socket, Data &data)
: socket_(std::move(socket)), system_(data.system) {
Session::Session(Socket &&socket, Data &)
: socket_(std::move(socket)) {
event_.data.ptr = this;
}
bool Session::Alive() const { return alive_; }
std::string Session::GetString(SizeT len) {
std::string Session::GetStringAndShift(SizeT len) {
std::string ret(reinterpret_cast<char *>(buffer_.data()), len);
buffer_.Shift(len);
return ret;
@ -22,30 +22,34 @@ std::string Session::GetString(SizeT len) {
void Session::Execute() {
if (!handshake_done_) {
SizeT len_reactor = GetLength();
SizeT len_channel = GetLength(2);
// Note: this function can be multiple times before the buffer has the full packet.
// We currently have to check for this case and return without shifting the buffer.
// In other words, only shift anything from the buffer if you can read the entire (sub)message.
if (len_reactor == 0 || len_channel == 0) return;
if (buffer_.size() < len_reactor + len_channel) return;
if (buffer_.size() < 2 * sizeof(SizeT)) return;
SizeT len_reactor = GetLength();
SizeT len_channel = GetLength(sizeof(SizeT));
if (buffer_.size() < 2 * sizeof(SizeT) + len_reactor + len_channel) return;
// remove the length bytes from the buffer
buffer_.Shift(2 * sizeof(SizeT));
reactor_ = GetString(len_reactor);
channel_ = GetString(len_channel);
reactor_ = GetStringAndShift(len_reactor);
channel_ = GetStringAndShift(len_channel);
std::cout << "Reactor: " << reactor_ << "; Channel: " << channel_
<< std::endl;
DLOG(INFO) << "Reactor: " << reactor_ << "; Channel: " << channel_
<< std::endl;
auto channel = system_->FindChannel(reactor_, channel_);
auto channel = System::GetInstance().FindChannel(reactor_, channel_);
SendSuccess(channel != nullptr);
handshake_done_ = true;
}
if (buffer_.size() < sizeof(SizeT)) return;
SizeT len_data = GetLength();
if (len_data == 0) return;
if (buffer_.size() < len_data) return;
if (buffer_.size() < sizeof(SizeT) + len_data) return;
// remove the length bytes from the buffer
buffer_.Shift(sizeof(SizeT));
@ -58,14 +62,13 @@ void Session::Execute() {
iarchive(message);
buffer_.Shift(len_data);
auto channel = system_->FindChannel(reactor_, channel_);
auto channel = System::GetInstance().FindChannel(reactor_, channel_);
if (channel == nullptr) {
SendSuccess(false);
return;
}
channel->Send(std::move(message));
SendSuccess(true);
}
@ -79,7 +82,6 @@ void Session::Close() {
}
SizeT Session::GetLength(int offset) {
if (buffer_.size() - offset < sizeof(SizeT)) return 0;
SizeT ret = *reinterpret_cast<SizeT *>(buffer_.data() + offset);
return ret;
}

View File

@ -7,7 +7,6 @@
#include "io/network/stream_buffer.hpp"
class Message;
class System;
/**
* @brief Protocol
@ -53,11 +52,10 @@ using SizeT = uint16_t;
/**
* Distributed Protocol Data
*
* This class is responsible for holding a pointer to System.
* This typically holds living data shared by all sessions. Currently empty.
*/
struct Data {
Data(System *_system) : system(_system) {}
System *system;
// empty
};
/**
@ -110,7 +108,7 @@ class Session {
private:
SizeT GetLength(int offset = 0);
std::string GetString(SizeT len);
std::string GetStringAndShift(SizeT len);
bool SendSuccess(bool success);
bool alive_{true};
@ -119,8 +117,6 @@ class Session {
std::string channel_{""};
Buffer buffer_;
System *system_;
};
/**

View File

@ -3,7 +3,7 @@
DEFINE_string(address, "127.0.0.1", "Network server bind address");
DEFINE_int32(port, 10000, "Network server bind port");
Network::Network(System *system) : system_(system), protocol_data_(system_) {}
Network::Network() {}
/**
* SenderMessage implementation.
@ -22,9 +22,9 @@ std::string SenderMessage::ReactorName() const { return reactor_; }
std::string SenderMessage::ChannelName() const { return channel_; }
std::shared_ptr<Channel> SenderMessage::GetChannelToSender(
System *system, Distributed *distributed) const {
Distributed *distributed) const {
if (address_ == FLAGS_address && port_ == FLAGS_port) {
return system->FindChannel(reactor_, channel_);
return System::GetInstance().FindChannel(reactor_, channel_);
}
if (distributed)
return distributed->network().Resolve(address_, port_, reactor_, channel_);

View File

@ -69,7 +69,7 @@ class Network {
};
public:
Network(System *system);
Network();
// client functions
@ -213,8 +213,6 @@ class Network {
}
private:
System *system_;
// client variables
SpinLock mutex_;
std::vector<std::thread> pool_;
@ -242,8 +240,7 @@ class SenderMessage : public Message {
std::string ReactorName() const;
std::string ChannelName() const;
std::shared_ptr<Channel> GetChannelToSender(System *system,
Distributed *distributed = nullptr) const;
std::shared_ptr<Channel> GetChannelToSender(Distributed *distributed = nullptr) const;
template <class Archive>
void serialize(Archive &ar) {
@ -282,7 +279,7 @@ class ChannelResolvedMessage : public Message {
*/
class Distributed {
public:
Distributed(System &system) : system_(system), network_(&system) {}
Distributed() {}
Distributed(const Distributed &) = delete;
Distributed(Distributed &&) = delete;
@ -321,19 +318,17 @@ class Distributed {
return stream_channel.first;
}
System &system() { return system_; }
Network &network() { return network_; }
protected:
System &system_;
Network network_;
};
class DistributedReactor : public Reactor {
public:
DistributedReactor(System *system, std::string name, Distributed &distributed)
: Reactor(system, name), distributed_(distributed) {}
DistributedReactor(std::string name, Distributed &distributed)
: Reactor(name), distributed_(distributed) {}
protected:
Distributed &distributed_;

View File

@ -93,9 +93,9 @@ void Reactor::RunEventLoop() {
if (exit_event_loop) break;
}
for (auto& cbAndSub : msg_and_cb.second) {
auto& cb = cbAndSub.first;
const Message& msg = *msg_and_cb.first;
for (auto &cbAndSub : msg_and_cb.second) {
auto &cb = cbAndSub.first;
const Message &msg = *msg_and_cb.first;
cb(msg, cbAndSub.second);
}
}
@ -106,8 +106,8 @@ void Reactor::RunEventLoop() {
*/
auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo {
// return type after because the scope Reactor:: is not searched before the name
for (auto& connectors_key_value : connectors_) {
Connector& event_queue = *connectors_key_value.second;
for (auto &connectors_key_value : connectors_) {
Connector &event_queue = *connectors_key_value.second;
auto msg_ptr = event_queue.LockedPop();
if (msg_ptr == nullptr) continue;
std::type_index tidx = msg_ptr->GetTypeIndex();
@ -115,7 +115,7 @@ auto Reactor::LockedGetPendingMessages() -> MsgAndCbInfo {
std::vector<std::pair<EventStream::Callback, EventStream::Subscription> > cb_info;
auto msg_type_cb_iter = event_queue.callbacks_.find(tidx);
if (msg_type_cb_iter != event_queue.callbacks_.end()) { // There is a callback for this type.
for (auto& tidx_cb_key_value : msg_type_cb_iter->second) {
for (auto &tidx_cb_key_value : msg_type_cb_iter->second) {
uint64_t uid = tidx_cb_key_value.first;
EventStream::Callback cb = tidx_cb_key_value.second;
cb_info.emplace_back(cb, EventStream::Subscription(event_queue, tidx, uid));

View File

@ -79,6 +79,10 @@ class EventStream {
*/
virtual std::unique_ptr<Message> PopEvent() = 0;
/**
* Get the name of the connector.
*/
virtual const std::string &ConnectorName() = 0;
/**
* Subscription Service.
*
@ -95,10 +99,10 @@ class EventStream {
friend class Reactor;
friend class Connector;
Subscription(Connector& event_queue, std::type_index tidx, uint64_t cb_uid)
Subscription(Connector &event_queue, std::type_index tidx, uint64_t cb_uid)
: event_queue_(event_queue), tidx_(tidx), cb_uid_(cb_uid) { }
Connector& event_queue_;
Connector &event_queue_;
std::type_index tidx_;
uint64_t cb_uid_;
};
@ -107,10 +111,10 @@ class EventStream {
* Register a callback that will be called whenever an event arrives.
*/
template<typename MsgType>
void OnEvent(std::function<void(const MsgType&, const Subscription&)>&& cb) {
OnEventHelper(typeid(MsgType), [cb = move(cb)](const Message& general_msg,
const Subscription& subscription) {
const MsgType& correct_msg = dynamic_cast<const MsgType&>(general_msg);
void OnEvent(std::function<void(const MsgType&, const Subscription&)> &&cb) {
OnEventHelper(typeid(MsgType), [cb = move(cb)](const Message &general_msg,
const Subscription &subscription) {
const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
cb(correct_msg, subscription);
});
}
@ -149,16 +153,16 @@ class EventStream {
*/
class OnEventOnceChainer {
public:
OnEventOnceChainer(EventStream& event_stream) : event_stream_(event_stream) {}
OnEventOnceChainer(EventStream &event_stream) : event_stream_(event_stream) {}
~OnEventOnceChainer() {
InstallCallbacks();
}
template<typename MsgType>
OnEventOnceChainer& ChainOnce(std::function<void(const MsgType&)>&& cb) {
OnEventOnceChainer &ChainOnce(std::function<void(const MsgType&)> &&cb) {
std::function<void(const Message&, const Subscription&)> wrap =
[cb = std::move(cb)](const Message& general_msg, const Subscription& subscription) {
const MsgType& correct_msg = dynamic_cast<const MsgType&>(general_msg);
[cb = std::move(cb)](const Message &general_msg, const Subscription &subscription) {
const MsgType &correct_msg = dynamic_cast<const MsgType&>(general_msg);
subscription.unsubscribe();
cb(correct_msg); // Warning: this can close the Channel, be careful what you put after it!
};
@ -178,7 +182,7 @@ class EventStream {
tmp_cb = [cb = std::move(cbs_[i].second),
next_type,
next_cb = std::move(next_cb),
es_ptr = &this->event_stream_](const Message& msg, const Subscription& subscription) {
es_ptr = &this->event_stream_](const Message &msg, const Subscription &subscription) {
cb(msg, subscription);
if (next_cb != nullptr) {
es_ptr->OnEventHelper(next_type, std::move(next_cb));
@ -191,7 +195,7 @@ class EventStream {
event_stream_.OnEventHelper(next_type, std::move(next_cb));
}
EventStream& event_stream_;
EventStream &event_stream_;
std::vector<std::pair<std::type_index, std::function<void(const Message&, const Subscription&)>>> cbs_;
};
typedef std::function<void(const Message&, const Subscription&)> Callback;
@ -286,6 +290,9 @@ class Connector {
std::unique_lock<std::mutex> lock(*mutex_);
queue_->LockedOnEventHelper(tidx, callback);
}
const std::string &ConnectorName() {
return queue_->connector_name_;
}
void Close();
private:
@ -350,7 +357,7 @@ private:
return t;
}
void RemoveCb(const EventStream::Subscription& subscription) {
void RemoveCb(const EventStream::Subscription &subscription) {
std::unique_lock<std::mutex> lock(*mutex_);
size_t num_erased = callbacks_[subscription.tidx_].erase(subscription.cb_uid_);
assert(num_erased == 1);
@ -382,8 +389,8 @@ class Reactor {
public:
friend class System;
Reactor(System *system, std::string name)
: system_(system), name_(name), main_(Open("main")) {}
Reactor(std::string name)
: name_(name), main_(Open("main")) {}
virtual ~Reactor() {}
@ -407,13 +414,17 @@ class Reactor {
*/
void CloseAllConnectors();
/**
* Get Reactor name
*/
const std::string &name() { return name_; }
Reactor(const Reactor &other) = delete;
Reactor(Reactor &&other) = default;
Reactor &operator=(const Reactor &other) = delete;
Reactor &operator=(Reactor &&other) = default;
protected:
System *system_;
std::string name_;
/*
* Locks all Reactor data, including all Connector's in connectors_.
@ -450,27 +461,32 @@ class Reactor {
/**
* Global placeholder for all reactors in the system. Alive through the entire process lifetime.
* Global placeholder for all reactors in the system.
*
* E.g. holds set of reactors, channels for all reactors.
* Alive through the entire process lifetime.
* Singleton class. Created automatically.
*/
class System {
public:
friend class Reactor;
System() {}
System(const System &) = delete;
System(System &&) = delete;
System &operator=(const System &) = delete;
System &operator=(System &&) = delete;
/**
* Get the (singleton) instance of System.
*
* More info: https://stackoverflow.com/questions/1008019/c-singleton-design-pattern
*/
static System &GetInstance() {
static System system; // guaranteed to be destroyed, initialized on first use
return system;
}
template <class ReactorType, class... Args>
const std::shared_ptr<Channel> Spawn(const std::string &name,
Args &&... args) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
auto *raw_reactor =
new ReactorType(this, name, std::forward<Args>(args)...);
new ReactorType(name, std::forward<Args>(args)...);
std::unique_ptr<Reactor> reactor(raw_reactor);
// Capturing a pointer isn't ideal, I would prefer to capture a Reactor&, but not sure how to do it.
std::thread reactor_thread(
@ -494,10 +510,17 @@ class System {
auto &thread = key_value.second.second;
thread.join();
}
reactors_.clear(); // for testing, since System is a singleton now
}
private:
void StartReactor(Reactor& reactor) {
System() {}
System(const System &) = delete;
System(System &&) = delete;
System &operator=(const System &) = delete;
System &operator=(System &&) = delete;
void StartReactor(Reactor &reactor) {
current_reactor_ = &reactor;
reactor.Run();
reactor.RunEventLoop(); // Activate callbacks.

View File

@ -1,7 +1,9 @@
#include "reactors_distributed.hpp"
#include <iostream>
#include <fstream>
#include "reactors_distributed.hpp"
#include <glog/logging.h>
DEFINE_int64(my_mnid, 0, "Memgraph node id"); // TODO(zuza): this should be assigned by the leader once in the future
DEFINE_string(config_filename, "", "File containing list of all processes");
@ -11,10 +13,10 @@ class MemgraphDistributed : public Distributed {
using Location = std::pair<std::string, uint16_t>;
public:
MemgraphDistributed(System &system) : Distributed(system) {}
MemgraphDistributed(System &system) : Distributed() {}
/** Register memgraph node id to the given location. */
void RegisterMemgraphNode(int64_t mnid, const std::string& address, uint16_t port) {
void RegisterMemgraphNode(int64_t mnid, const std::string &address, uint16_t port) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
mnodes_[mnid] = Location(address, port);
}
@ -23,7 +25,7 @@ class MemgraphDistributed : public Distributed {
const std::string &reactor,
const std::string &channel) {
std::unique_lock<std::recursive_mutex> lock(mutex_);
const auto& location = mnodes_.at(mnid);
const auto &location = mnodes_.at(mnid);
return Distributed::FindChannel(location.first, location.second, reactor, channel);
}
@ -47,45 +49,70 @@ class MemgraphDistributed : public Distributed {
* @return Pair (master mnid, list of worker's id).
*/
std::pair<int64_t, std::vector<int64_t>>
ParseConfigAndRegister(const std::string& filename,
MemgraphDistributed& distributed) {
ParseConfigAndRegister(const std::string &filename,
MemgraphDistributed &distributed) {
std::ifstream file(filename, std::ifstream::in);
assert(file.good());
int64_t master_mnid;
std::vector<int64_t> worker_mnids;
int64_t mnid;
std::string address;
uint16_t port;
file >> master_mnid >> address >> port;
distributed.RegisterMemgraphNode(master_mnid, address, port);
std::string address;
uint16_t port;
file >> master_mnid >> address >> port;
distributed.RegisterMemgraphNode(master_mnid, address, port);
while (file.good()) {
file >> mnid >> address >> port;
file >> mnid >> address >> port;
if (file.eof())
break ;
distributed.RegisterMemgraphNode(mnid, address, port);
worker_mnids.push_back(mnid);
}
file.close();
return std::make_pair(master_mnid, worker_mnids);
}
file.close();
return std::make_pair(master_mnid, worker_mnids);
}
/**
* Interface to the Memgraph distributed system.
*
* E.g. get channel to other Memgraph nodes.
*/
class MemgraphReactor : public Reactor {
public:
MemgraphReactor(System* system, std::string name,
MemgraphDistributed &distributed)
: Reactor(system, name), distributed_(distributed) {}
MemgraphReactor(std::string name,
MemgraphDistributed &distributed)
: Reactor(name), distributed_(distributed) {}
protected:
MemgraphDistributed &distributed_;
};
/**
* Sends a text message and has a return address.
*/
class TextMessage : public SenderMessage {
public:
TextMessage(std::string reactor, std::string channel, std::string s)
: SenderMessage(reactor, channel), text(s) {}
template <class Archive>
void serialize(Archive &archive) {
archive(cereal::virtual_base_class<SenderMessage>(this), text);
}
std::string text;
protected:
friend class cereal::access;
TextMessage() {} // Cereal needs access to a default constructor.
};
CEREAL_REGISTER_TYPE(TextMessage);
class Master : public MemgraphReactor {
public:
Master(System* system, std::string name, MemgraphDistributed &distributed,
int64_t mnid, std::vector<int64_t>&& worker_mnids)
: MemgraphReactor(system, name, distributed), mnid_(mnid),
Master(std::string name, MemgraphDistributed &distributed,
int64_t mnid, std::vector<int64_t> &&worker_mnids)
: MemgraphReactor(name, distributed), mnid_(mnid),
worker_mnids_(std::move(worker_mnids)) {}
virtual void Run() {
@ -93,9 +120,11 @@ class Master : public MemgraphReactor {
<< ":" << distributed_.network().Port() << std::endl;
auto stream = main_.first;
stream->OnEvent<SenderMessage>([this](const SenderMessage &msg,
const EventStream::Subscription& subscription) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n";
// wait until every worker sends a SenderMessage back, then close
stream->OnEvent<TextMessage>([this](const TextMessage &msg,
const EventStream::Subscription &subscription) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n";
++workers_seen;
if (workers_seen == worker_mnids_.size()) {
subscription.unsubscribe();
@ -107,11 +136,12 @@ class Master : public MemgraphReactor {
}
});
// send a TextMessage to each worker
for (auto wmnid : worker_mnids_) {
auto stream = distributed_.FindChannel(wmnid, "worker", "main");
stream->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this, stream](const ChannelResolvedMessage &msg){
msg.channel()->Send<SenderMessage>("master", "main");
msg.channel()->Send<TextMessage>("master", "main", "hi from master");
stream->Close();
});
}
@ -125,9 +155,9 @@ class Master : public MemgraphReactor {
class Worker : public MemgraphReactor {
public:
Worker(System* system, std::string name, MemgraphDistributed &distributed,
Worker(std::string name, MemgraphDistributed &distributed,
int64_t mnid, int64_t master_mnid)
: MemgraphReactor(system, name, distributed), mnid_(mnid),
: MemgraphReactor(name, distributed), mnid_(mnid),
master_mnid_(master_mnid) {}
virtual void Run() {
@ -135,20 +165,18 @@ class Worker : public MemgraphReactor {
<< ":" << distributed_.network().Port() << std::endl;
auto stream = main_.first;
// wait until master sends us a TextMessage, then reply back and close
stream->OnEventOnce()
.ChainOnce<SenderMessage>([this](const SenderMessage &msg) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << "\n";
.ChainOnce<TextMessage>([this](const TextMessage &msg) {
std::cout << "Message from " << msg.Address() << ":" << msg.Port() << " .. " << msg.text << "\n";
msg.GetChannelToSender(&distributed_)
->Send<TextMessage>("worker", "main", "hi from worker");
// Sleep for a while so we can read output in the terminal.
std::this_thread::sleep_for(std::chrono::seconds(4));
CloseConnector("main");
});
auto remote_stream = distributed_.FindChannel(master_mnid_, "master", "main");
remote_stream->OnEventOnce()
.ChainOnce<ChannelResolvedMessage>([this, remote_stream](const ChannelResolvedMessage &msg){
msg.channel()->Send<SenderMessage>("worker", "main");
remote_stream->Close();
});
}
protected:
@ -158,9 +186,10 @@ class Worker : public MemgraphReactor {
int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
gflags::ParseCommandLineFlags(&argc, &argv, true);
System system;
System &system = System::GetInstance();
MemgraphDistributed distributed(system);
auto mnids = ParseConfigAndRegister(FLAGS_config_filename, distributed);
distributed.StartServices();

View File

@ -0,0 +1,389 @@
// This is a deprecated implementation! It is using the deprecated AwaitEvent, I'm changing it to use OnEvent. WIP
// #include <atomic>
// #include <chrono>
// #include <cstdlib>
// #include <iostream>
// #include <string>
// #include <thread>
// #include <vector>
// #include "reactors_distributed.hpp"
// const int NUM_WORKERS = 1;
// class Txn : public SenderMessage {
// public:
// Txn(std::string reactor, std::string channel, int64_t id) : SenderMessage(reactor, channel), id_(id) {}
// int64_t id() const { return id_; }
// template <class Archive>
// void serialize(Archive &archive) {
// archive(cereal::base_class<SenderMessage>(this), id_);
// }
// private:
// int64_t id_;
// };
// class CreateNodeTxn : public Txn {
// public:
// CreateNodeTxn(std::string reactor, std::string channel, int64_t id) : Txn(reactor, channel, id) {}
// template <class Archive>
// void serialize(Archive &archive) {
// archive(cereal::base_class<Txn>(this));
// }
// };
// class CountNodesTxn : public Txn {
// public:
// CountNodesTxn(std::string reactor, std::string channel, int64_t id) : Txn(reactor, channel, id) {}
// template <class Archive>
// void serialize(Archive &archive) {
// archive(cereal::base_class<Txn>(this));
// }
// };
// class CountNodesTxnResult : public Message {
// public:
// CountNodesTxnResult(int64_t count) : count_(count) {}
// int64_t count() const { return count_; }
// template <class Archive>
// void serialize(Archive &archive) {
// archive(count_);
// }
// private:
// int64_t count_;
// };
// class CommitRequest : public SenderMessage {
// public:
// CommitRequest(std::string reactor, std::string channel, int64_t worker_id)
// : SenderMessage(reactor, channel), worker_id_(worker_id) {}
// int64_t worker_id() { return worker_id_; }
// template <class Archive>
// void serialize(Archive &archive) {
// archive(cereal::base_class<SenderMessage>(this), worker_id_);
// }
// private:
// int64_t worker_id_;
// };
// class AbortRequest : public SenderMessage {
// public:
// AbortRequest(std::string reactor, std::string channel, int64_t worker_id)
// : SenderMessage(reactor, channel), worker_id_(worker_id) {}
// int64_t worker_id() { return worker_id_; }
// template <class Archive>
// void serialize(Archive &archive) {
// archive(cereal::base_class<SenderMessage>(this), worker_id_);
// }
// private:
// int64_t worker_id_;
// };
// class CommitDirective : public Message {
// template <class Archive>
// void serialize(Archive &archive) {
// archive(cereal::base_class<Message>(this));
// }
// };
// class AbortDirective : public Message {
// template <class Archive>
// void serialize(Archive &archive) {
// archive(cereal::base_class<Message>(this));
// }
// };
// class Query : public Message {
// public:
// Query(std::string query) : Message(), query_(query) {}
// std::string query() const { return query_; }
// template <class Archive>
// void serialize(Archive &archive) {
// archive(cereal::base_class<Message>(this), query_);
// }
// private:
// std::string query_;
// };
// class Quit : public Message {
// template <class Archive>
// void serialize(Archive &archive) {
// archive(cereal::base_class<Message>(this));
// }
// };
// class Master : public Reactor {
// public:
// Master(System *system, std::string name) : Reactor(system, name), next_xid_(1) {}
// virtual void Run() {
// auto stream = main_.first;
// FindWorkers();
// std::cout << "Master is active" << std::endl;
// while (true) {
// auto m = stream->AwaitEvent();
// if (Query *query = dynamic_cast<Query *>(m.get())) {
// ProcessQuery(query);
// break; // process only the first query
// } else if (SenderMessage *msg = dynamic_cast<SenderMessage *>(m.get())) {
// std::cout << "SenderMessage received!" << std::endl;
// std::cout << " Address: " << msg->Address() << std::endl;
// std::cout << " Port: " << msg->Port() << std::endl;
// std::cout << " Reactor: " << msg->ReactorName() << std::endl;
// std::cout << " Channel: " << msg->ChannelName() << std::endl;
// } else {
// std::cerr << "unknown message\n";
// exit(1);
// }
// }
// stream->OnEvent<Message>([this](const Message &msg, const EventStream::Subscription& subscription) {
// std::cout << "Processing Query via Callback" << std::endl;
// const Query &query =
// dynamic_cast<const Query &>(msg); // exception bad_cast
// ProcessQuery(&query);
// subscription.unsubscribe();
// });
// }
// private:
// void ProcessQuery(const Query *query) {
// if (query->query() == "create node") {
// PerformCreateNode();
// } else if (query->query() == "count nodes") {
// PerformCountNodes();
// } else {
// std::cout << "got query: " << query->query() << std::endl;
// }
// }
// void PerformCreateNode() {
// int worker_id = rand() % NUM_WORKERS;
// int64_t xid = GetTransactionId();
// std::string txn_channel_name = GetTxnName(xid);
// auto connector = Open(txn_channel_name);
// auto stream = connector.first;
// channels_[worker_id]->Send<CreateNodeTxn>("master", "main", xid);
// auto m = stream->AwaitEvent();
// if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
// req->GetChannelToSender(system_)->Send<CommitDirective>();
// } else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) {
// req->GetChannelToSender(system_)->Send<AbortDirective>();
// } else {
// std::cerr << "unknown message\n";
// exit(1);
// }
// CloseConnector(txn_channel_name);
// }
// void PerformCountNodes() {
// int64_t xid = GetTransactionId();
// std::string txn_channel_name = GetTxnName(xid);
// auto connector = Open(txn_channel_name);
// auto stream = connector.first;
// for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
// channels_[w_id]->Send<CountNodesTxn>("master", "main", xid);
// std::vector<std::shared_ptr<Channel>> txn_channels;
// txn_channels.resize(NUM_WORKERS, nullptr);
// bool commit = true;
// for (int responds = 0; responds < NUM_WORKERS; ++responds) {
// auto m = stream->AwaitEvent();
// if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
// txn_channels[req->worker_id()] = req->GetChannelToSender(system_);
// commit &= true;
// } else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) {
// txn_channels[req->worker_id()] = req->GetChannelToSender(system_);
// commit = false;
// } else {
// std::cerr << "unknown message\n";
// exit(1);
// }
// }
// if (commit) {
// for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
// txn_channels[w_id]->Send<CommitDirective>();
// } else {
// for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
// txn_channels[w_id]->Send<AbortDirective>();
// }
// int64_t count = 0;
// for (int responds = 0; responds < NUM_WORKERS; ++responds) {
// auto m = stream->AwaitEvent();
// if (CountNodesTxnResult *cnt =
// dynamic_cast<CountNodesTxnResult *>(m.get())) {
// count += cnt->count();
// } else {
// std::cerr << "unknown message\n";
// exit(1);
// }
// }
// CloseConnector(txn_channel_name);
// std::cout << "graph has " << count << " vertices" << std::endl;
// }
// int64_t GetTransactionId() { return next_xid_++; }
// std::string GetWorkerName(int worker_id) {
// return "worker" + std::to_string(worker_id);
// }
// std::string GetTxnName(int txn_id) { return "txn" + std::to_string(txn_id); }
// void FindWorkers() {
// channels_.resize(NUM_WORKERS, nullptr);
// int workers_found = 0;
// while (workers_found < NUM_WORKERS) {
// for (int64_t w_id = 0; w_id < NUM_WORKERS; ++w_id) {
// if (channels_[w_id] == nullptr) {
// // TODO: Resolve worker channel using the network service.
// channels_[w_id] = system_->FindChannel(GetWorkerName(w_id), "main");
// if (channels_[w_id] != nullptr) ++workers_found;
// }
// }
// if (workers_found < NUM_WORKERS)
// std::this_thread::sleep_for(std::chrono::seconds(1));
// }
// }
// // TODO: Why is master atomic, it should be unique?
// std::atomic<int64_t> next_xid_;
// std::vector<std::shared_ptr<Channel>> channels_;
// };
// class Worker : public Reactor {
// public:
// Worker(System *system, std::string name, int64_t id) : Reactor(system, name),
// worker_id_(id) {}
// virtual void Run() {
// std::cout << "worker " << worker_id_ << " is active" << std::endl;
// auto stream = main_.first;
// FindMaster();
// while (true) {
// auto m = stream->AwaitEvent();
// if (Txn *txn = dynamic_cast<Txn *>(m.get())) {
// HandleTransaction(txn);
// } else {
// std::cerr << "unknown message\n";
// exit(1);
// }
// }
// }
// private:
// void HandleTransaction(Txn *txn) {
// if (CreateNodeTxn *create_txn = dynamic_cast<CreateNodeTxn *>(txn)) {
// HandleCreateNode(create_txn);
// } else if (CountNodesTxn *cnt_txn = dynamic_cast<CountNodesTxn *>(txn)) {
// HandleCountNodes(cnt_txn);
// } else {
// std::cerr << "unknown transaction\n";
// exit(1);
// }
// }
// void HandleCreateNode(CreateNodeTxn *txn) {
// auto connector = Open(GetTxnChannelName(txn->id()));
// auto stream = connector.first;
// auto masterChannel = txn->GetChannelToSender(system_);
// // TODO: Do the actual commit.
// masterChannel->Send<CommitRequest>("master", "main", worker_id_);
// auto m = stream->AwaitEvent();
// if (dynamic_cast<CommitDirective *>(m.get())) {
// // TODO: storage_.CreateNode();
// } else if (dynamic_cast<AbortDirective *>(m.get())) {
// // TODO: Rollback.
// } else {
// std::cerr << "unknown message\n";
// exit(1);
// }
// CloseConnector(GetTxnChannelName(txn->id()));
// }
// void HandleCountNodes(CountNodesTxn *txn) {
// auto connector = Open(GetTxnChannelName(txn->id()));
// auto stream = connector.first;
// auto masterChannel = txn->GetChannelToSender(system_);
// // TODO: Fix this hack -- use the storage.
// int num = 123;
// masterChannel->Send<CommitRequest>("master", "main", worker_id_);
// auto m = stream->AwaitEvent();
// if (dynamic_cast<CommitDirective *>(m.get())) {
// masterChannel->Send<CountNodesTxnResult>(num);
// } else if (dynamic_cast<AbortDirective *>(m.get())) {
// // send nothing
// } else {
// std::cerr << "unknown message\n";
// exit(1);
// }
// CloseConnector(GetTxnChannelName(txn->id()));
// }
// // TODO: Don't repeat code from Master.
// std::string GetTxnChannelName(int64_t transaction_id) {
// return "txn" + std::to_string(transaction_id);
// }
// void FindMaster() {
// // TODO: Replace with network service and channel resolution.
// while (!(master_channel_ = system_->FindChannel("master", "main")))
// std::this_thread::sleep_for(std::chrono::seconds(1));
// }
// std::shared_ptr<Channel> master_channel_ = nullptr;
// int worker_id_;
// };
// void ClientMain(System *system) {
// std::shared_ptr<Channel> channel = nullptr;
// // TODO: Replace this with network channel resolution.
// while (!(channel = system->FindChannel("master", "main")))
// std::this_thread::sleep_for(std::chrono::seconds(1));
// std::cout << "I/O Client Main active" << std::endl;
// bool active = true;
// while (active) {
// std::string s;
// std::getline(std::cin, s);
// if (s == "quit") {
// active = false;
// channel->Send<Quit>();
// } else {
// channel->Send<Query>(s);
// }
// }
// }
// int main(int argc, char *argv[]) {
// //google::InitGoogleLogging(argv[0]);
// gflags::ParseCommandLineFlags(&argc, &argv, true);
// System system;
// system.Spawn<Master>("master");
// std::thread client(ClientMain, &system);
// for (int i = 0; i < NUM_WORKERS; ++i)
// system.Spawn<Worker>("worker" + std::to_string(i), i);
// system.AwaitShutdown();
// return 0;
// }

View File

@ -38,8 +38,8 @@ CEREAL_REGISTER_TYPE(ChatACK);
class ChatServer : public DistributedReactor {
public:
ChatServer(System *system, std::string name, Distributed &distributed)
: DistributedReactor(system, name, distributed) {}
ChatServer(std::string name, Distributed &distributed)
: DistributedReactor(name, distributed) {}
virtual void Run() {
std::cout << "ChatServer is active" << std::endl;
@ -56,7 +56,7 @@ class ChatServer : public DistributedReactor {
std::cout << "Received message from " << msg.Address() << ":"
<< msg.Port() << " -> '" << msg.Message() << "'"
<< std::endl;
auto channel = msg.GetChannelToSender(system_);
auto channel = msg.GetChannelToSender();
if (channel != nullptr) {
channel->Send<ChatACK>("server", "chat", msg.Message());
}
@ -66,8 +66,8 @@ class ChatServer : public DistributedReactor {
class ChatClient : public DistributedReactor {
public:
ChatClient(System *system, std::string name, Distributed &distributed)
: DistributedReactor(system, name, distributed) {}
ChatClient(std::string name, Distributed &distributed)
: DistributedReactor(name, distributed) {}
virtual void Run() {
std::cout << "ChatClient is active" << std::endl;
@ -91,8 +91,8 @@ class ChatClient : public DistributedReactor {
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
System system;
Distributed distributed(system);
System& system = System::GetInstance();
Distributed distributed;
distributed.StartServices();
system.Spawn<ChatServer>("server", distributed);
system.Spawn<ChatClient>("client", distributed);

View File

@ -2,8 +2,7 @@
int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
System system;
Distributed distributed(system);
Distributed distributed;
distributed.network().StartClient(1);
auto channel = distributed.network().Resolve("127.0.0.1", 10000, "master", "main");
std::cout << channel << std::endl;

View File

@ -13,13 +13,13 @@
TEST(SystemTest, ReturnWithoutThrowing) {
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
CloseConnector("main");
}
};
System system;
System &system = System::GetInstance();
ASSERT_NO_THROW(system.Spawn<Master>("master"));
ASSERT_NO_THROW(system.AwaitShutdown());
}
@ -27,7 +27,7 @@ TEST(SystemTest, ReturnWithoutThrowing) {
TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
Open("channel");
ASSERT_THROW(Open("channel"), std::runtime_error);
@ -36,7 +36,7 @@ TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
}
};
System system;
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.AwaitShutdown();
}
@ -44,10 +44,10 @@ TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
TEST(ConnectorSetUpTest, CheckMainChannelIsSet) {
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
while (!(channel = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
CloseConnector("main");
@ -55,17 +55,17 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) {
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("master", "main")))
while (!(channel = System::GetInstance().FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
CloseConnector("main");
}
};
System system;
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
@ -79,10 +79,10 @@ TEST(SimpleSendTest, OneSimpleSend) {
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
while (!(channel = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(123);
CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII.
@ -90,7 +90,7 @@ TEST(SimpleSendTest, OneSimpleSend) {
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
EventStream* stream = main_.first;
std::unique_ptr<Message> m_uptr = stream->AwaitEvent();
@ -101,7 +101,7 @@ TEST(SimpleSendTest, OneSimpleSend) {
}
};
System system;
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
@ -114,10 +114,10 @@ TEST(SimpleSendTest, OneCallback) {
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
while (!(channel = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(888);
CloseConnector("main");
@ -125,18 +125,18 @@ TEST(SimpleSendTest, OneCallback) {
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEvent<MessageInt>([this](const MessageInt& msg, const EventStream::Subscription&) {
stream->OnEvent<MessageInt>([this](const MessageInt &msg, const EventStream::Subscription&) {
ASSERT_EQ(msg.x, 888);
CloseConnector("main");
});
}
};
System system;
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
@ -150,10 +150,10 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
while (!(channel = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(101);
channel->Send<MessageInt>(102); // should be ignored
@ -165,7 +165,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
EventStream* stream = main_.first;
std::unique_ptr<Message> m_uptr = stream->AwaitEvent();
@ -176,7 +176,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
}
};
System system;
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
@ -190,12 +190,12 @@ TEST(SimpleSendTest, DuringFirstEvent) {
};
struct Master : public Reactor {
Master(System *system, std::string name, std::promise<int> p) : Reactor(system, name), p_(std::move(p)) {}
Master(std::string name, std::promise<int> p) : Reactor(name), p_(std::move(p)) {}
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEvent<MessageInt>([this](const Message& msg, const EventStream::Subscription& subscription) {
const MessageInt& msgint = dynamic_cast<const MessageInt&>(msg);
stream->OnEvent<MessageInt>([this](const Message &msg, const EventStream::Subscription &subscription) {
const MessageInt &msgint = dynamic_cast<const MessageInt&>(msg);
if (msgint.x == 101)
FindChannel("main")->Send<MessageInt>(102);
if (msgint.x == 102) {
@ -211,7 +211,7 @@ TEST(SimpleSendTest, DuringFirstEvent) {
std::promise<int> p_;
};
System system;
System &system = System::GetInstance();
std::promise<int> p;
auto f = p.get_future();
system.Spawn<Master>("master", std::move(p));
@ -232,10 +232,10 @@ TEST(MultipleSendTest, UnsubscribeService) {
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
while (!(channel = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(55);
channel->Send<MessageInt>(66);
@ -251,21 +251,21 @@ TEST(MultipleSendTest, UnsubscribeService) {
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
Worker(std::string name) : Reactor(name) {}
int num_msgs_received = 0;
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEvent<MessageInt>([this](const MessageInt& msgint, const EventStream::Subscription& subscription) {
stream->OnEvent<MessageInt>([this](const MessageInt &msgint, const EventStream::Subscription &subscription) {
ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
++num_msgs_received;
if (msgint.x == 66) {
subscription.unsubscribe(); // receive only two of them
}
});
stream->OnEvent<MessageChar>([this](const MessageChar& msgchar, const EventStream::Subscription& subscription) {
stream->OnEvent<MessageChar>([this](const MessageChar &msgchar, const EventStream::Subscription &subscription) {
char c = msgchar.x;
++num_msgs_received;
ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
@ -277,7 +277,7 @@ TEST(MultipleSendTest, UnsubscribeService) {
}
};
System system;
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
@ -295,10 +295,10 @@ TEST(MultipleSendTest, OnEvent) {
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
while (!(channel = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(101);
@ -310,7 +310,7 @@ TEST(MultipleSendTest, OnEvent) {
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
Worker(std::string name) : Reactor(name) {}
struct EndMessage : Message {};
int correct_vals = 0;
@ -319,13 +319,13 @@ TEST(MultipleSendTest, OnEvent) {
EventStream* stream = main_.first;
correct_vals = 0;
stream->OnEvent<MessageInt>([this](const MessageInt& msgint, const EventStream::Subscription&) {
stream->OnEvent<MessageInt>([this](const MessageInt &msgint, const EventStream::Subscription&) {
ASSERT_TRUE(msgint.x == 101 || msgint.x == 103);
++correct_vals;
main_.second->Send<EndMessage>();
});
stream->OnEvent<MessageChar>([this](const MessageChar& msgchar, const EventStream::Subscription&) {
stream->OnEvent<MessageChar>([this](const MessageChar &msgchar, const EventStream::Subscription&) {
ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b');
++correct_vals;
main_.second->Send<EndMessage>();
@ -340,7 +340,7 @@ TEST(MultipleSendTest, OnEvent) {
}
};
System system;
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
@ -353,10 +353,10 @@ TEST(MultipleSendTest, Chaining) {
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
while (!(channel = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(55);
channel->Send<MessageInt>(66);
@ -366,26 +366,26 @@ TEST(MultipleSendTest, Chaining) {
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEventOnce()
.ChainOnce<MessageInt>([this](const MessageInt& msg) {
.ChainOnce<MessageInt>([this](const MessageInt &msg) {
ASSERT_EQ(msg.x, 55);
})
.ChainOnce<MessageInt>([](const MessageInt& msg) {
.ChainOnce<MessageInt>([](const MessageInt &msg) {
ASSERT_EQ(msg.x, 66);
})
.ChainOnce<MessageInt>([this](const MessageInt& msg) {
.ChainOnce<MessageInt>([this](const MessageInt &msg) {
ASSERT_EQ(msg.x, 77);
CloseConnector("main");
});
}
};
System system;
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
@ -404,10 +404,10 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
while (!(channel = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageChar>('a');
channel->Send<MessageInt>(55);
@ -418,26 +418,26 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
Worker(std::string name) : Reactor(name) {}
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEventOnce()
.ChainOnce<MessageInt>([this](const MessageInt& msg) {
.ChainOnce<MessageInt>([this](const MessageInt &msg) {
ASSERT_EQ(msg.x, 55);
})
.ChainOnce<MessageChar>([](const MessageChar& msg) {
.ChainOnce<MessageChar>([](const MessageChar &msg) {
ASSERT_EQ(msg.x, 'b');
})
.ChainOnce<MessageInt>([this](const MessageInt& msg) {
.ChainOnce<MessageInt>([this](const MessageInt &msg) {
ASSERT_EQ(msg.x, 77);
CloseConnector("main");
});
}
};
System system;
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
@ -453,10 +453,10 @@ TEST(MultipleSendTest, ProcessManyMessages) {
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
Master(std::string name) : Reactor(name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
while (!(channel = System::GetInstance().FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
@ -469,7 +469,7 @@ TEST(MultipleSendTest, ProcessManyMessages) {
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
Worker(std::string name) : Reactor(name) {}
struct EndMessage : Message {};
int vals = 0;
@ -492,7 +492,7 @@ TEST(MultipleSendTest, ProcessManyMessages) {
}
};
System system;
System &system = System::GetInstance();
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();

View File

@ -6,7 +6,7 @@ import os
command = 'gnome-terminal'
program = './distributed_test'
config_filename = 'config'
flags = ' --minloglevel 2'
flags = '--minloglevel=2'
f = open(config_filename, 'r')
for line in f:
@ -14,8 +14,9 @@ for line in f:
my_mnid = data[0]
address = data[1]
port = data[2]
call = program + flags + ' --my_mnid ' + my_mnid + ' --address ' + address +\
' --port ' + port + ' --config_filename ' + config_filename
command += " --tab -e '" + call + "'"
call = "{} {} --my_mnid {} --address {} --port {} --config_filename={}".format(
program, flags, my_mnid, address, port, config_filename)
command += " --tab -e '{}'".format(call)
os.system(command)
print(command)
os.system(command)