Initial version of distributed networking stack.

Reviewers: buda, lion

Reviewed By: buda

Subscribers: pullbot, zuza

Differential Revision: https://phabricator.memgraph.io/D636
This commit is contained in:
Matej Ferencevic 2017-08-07 16:04:24 +02:00
parent 59b395626c
commit 2ba0f0cdea
7 changed files with 681 additions and 106 deletions

View File

@ -12,7 +12,7 @@ const int NUM_WORKERS = 1;
class Txn : public SenderMessage {
public:
Txn(std::shared_ptr<Channel> channel, int64_t id) : SenderMessage(channel), id_(id) {}
Txn(std::string reactor, std::string channel, int64_t id) : SenderMessage(reactor, channel), id_(id) {}
int64_t id() const { return id_; }
template <class Archive>
@ -26,7 +26,7 @@ class Txn : public SenderMessage {
class CreateNodeTxn : public Txn {
public:
CreateNodeTxn(std::shared_ptr<Channel> channel, int64_t id) : Txn(channel, id) {}
CreateNodeTxn(std::string reactor, std::string channel, int64_t id) : Txn(reactor, channel, id) {}
template <class Archive>
void serialize(Archive &archive) {
@ -36,7 +36,7 @@ class CreateNodeTxn : public Txn {
class CountNodesTxn : public Txn {
public:
CountNodesTxn(std::shared_ptr<Channel> channel, int64_t id) : Txn(channel, id) {}
CountNodesTxn(std::string reactor, std::string channel, int64_t id) : Txn(reactor, channel, id) {}
template <class Archive>
void serialize(Archive &archive) {
@ -60,8 +60,8 @@ class CountNodesTxnResult : public Message {
class CommitRequest : public SenderMessage {
public:
CommitRequest(std::shared_ptr<Channel> sender, int64_t worker_id)
: SenderMessage(sender), worker_id_(worker_id) {}
CommitRequest(std::string reactor, std::string channel, int64_t worker_id)
: SenderMessage(reactor, channel), worker_id_(worker_id) {}
int64_t worker_id() { return worker_id_; }
template <class Archive>
@ -75,8 +75,8 @@ class CommitRequest : public SenderMessage {
class AbortRequest : public SenderMessage {
public:
AbortRequest(std::shared_ptr<Channel> sender, int64_t worker_id)
: SenderMessage(sender), worker_id_(worker_id) {}
AbortRequest(std::string reactor, std::string channel, int64_t worker_id)
: SenderMessage(reactor, channel), worker_id_(worker_id) {}
int64_t worker_id() { return worker_id_; }
template <class Archive>
@ -137,6 +137,12 @@ class Master : public Reactor {
if (Query *query = dynamic_cast<Query *>(m.get())) {
ProcessQuery(query);
break; // process only the first query
} else if (SenderMessage *msg = dynamic_cast<SenderMessage *>(m.get())) {
std::cout << "SenderMessage received!" << std::endl;
std::cout << " Address: " << msg->Address() << std::endl;
std::cout << " Port: " << msg->Port() << std::endl;
std::cout << " Reactor: " << msg->ReactorName() << std::endl;
std::cout << " Channel: " << msg->ChannelName() << std::endl;
} else {
std::cerr << "unknown message\n";
exit(1);
@ -171,13 +177,13 @@ class Master : public Reactor {
auto stream = connector.first;
auto create_node_txn =
std::make_unique<CreateNodeTxn>(connector.second, xid);
std::make_unique<CreateNodeTxn>("master", "main", xid);
channels_[worker_id]->Send(std::move(create_node_txn));
auto m = stream->AwaitEvent();
if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
req->sender()->Send(std::make_unique<CommitDirective>());
req->GetChannelToSender(system_)->Send(std::make_unique<CommitDirective>());
} else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) {
req->sender()->Send(std::make_unique<AbortDirective>());
req->GetChannelToSender(system_)->Send(std::make_unique<AbortDirective>());
} else {
std::cerr << "unknown message\n";
exit(1);
@ -192,7 +198,7 @@ class Master : public Reactor {
auto stream = connector.first;
for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
channels_[w_id]->Send(
std::make_unique<CountNodesTxn>(connector.second, xid));
std::make_unique<CountNodesTxn>("master", "main", xid));
std::vector<std::shared_ptr<Channel>> txn_channels;
txn_channels.resize(NUM_WORKERS, nullptr);
@ -200,10 +206,10 @@ class Master : public Reactor {
for (int responds = 0; responds < NUM_WORKERS; ++responds) {
auto m = stream->AwaitEvent();
if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
txn_channels[req->worker_id()] = req->sender();
txn_channels[req->worker_id()] = req->GetChannelToSender(system_);
commit &= true;
} else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) {
txn_channels[req->worker_id()] = req->sender();
txn_channels[req->worker_id()] = req->GetChannelToSender(system_);
commit = false;
} else {
std::cerr << "unknown message\n";
@ -299,10 +305,10 @@ class Worker : public Reactor {
void HandleCreateNode(CreateNodeTxn *txn) {
auto connector = Open(GetTxnChannelName(txn->id()));
auto stream = connector.first;
auto masterChannel = txn->sender();
auto masterChannel = txn->GetChannelToSender(system_);
// TODO: Do the actual commit.
masterChannel->Send(
std::make_unique<CommitRequest>(connector.second, worker_id_));
std::make_unique<CommitRequest>("master", "main", worker_id_));
auto m = stream->AwaitEvent();
if (dynamic_cast<CommitDirective *>(m.get())) {
// TODO: storage_.CreateNode();
@ -318,13 +324,13 @@ class Worker : public Reactor {
void HandleCountNodes(CountNodesTxn *txn) {
auto connector = Open(GetTxnChannelName(txn->id()));
auto stream = connector.first;
auto masterChannel = txn->sender();
auto masterChannel = txn->GetChannelToSender(system_);
// TODO: Fix this hack -- use the storage.
int num = 123;
masterChannel->Send(
std::make_unique<CommitRequest>(connector.second, worker_id_));
std::make_unique<CommitRequest>("master", "main", worker_id_));
auto m = stream->AwaitEvent();
if (dynamic_cast<CommitDirective *>(m.get())) {
masterChannel->Send(std::make_unique<CountNodesTxnResult>(num));
@ -372,8 +378,12 @@ void ClientMain(System *system) {
}
}
int main() {
int main(int argc, char *argv[]) {
//google::InitGoogleLogging(argv[0]);
gflags::ParseCommandLineFlags(&argc, &argv, true);
System system;
system.StartServices();
system.Spawn<Master>("master");
std::thread client(ClientMain, &system);
for (int i = 0; i < NUM_WORKERS; ++i)

View File

@ -1,16 +1,19 @@
#include "communication.hpp"
DEFINE_string(address, "127.0.0.1", "Network server bind address");
DEFINE_int32(port, 10000, "Network server bind port");
void EventStream::Subscription::unsubscribe() {
event_queue_.RemoveCbByUid(cb_uid_);
}
thread_local Reactor* current_reactor_ = nullptr;
std::string Connector::LocalChannel::Hostname() {
return system_->network().Hostname();
std::string Connector::LocalChannel::Address() {
return system_->network().Address();
}
int32_t Connector::LocalChannel::Port() {
uint16_t Connector::LocalChannel::Port() {
return system_->network().Port();
}
@ -127,6 +130,29 @@ auto Reactor::LockedGetPendingMessages(std::unique_lock<std::mutex> &lock) -> Ms
return MsgAndCbInfo(nullptr, {});
}
Network::Network(System *system) : system_(system),
hostname_(system->config().GetString("hostname")),
port_(system->config().GetInt("port")) {}
Network::Network(System *system) : system_(system), protocol_data_(system_) {}
/**
* SenderMessage implementation.
*/
SenderMessage::SenderMessage() {}
SenderMessage::SenderMessage(std::string reactor, std::string channel)
: address_(FLAGS_address),
port_(FLAGS_port),
reactor_(reactor),
channel_(channel) {}
std::string SenderMessage::Address() const { return address_; }
uint16_t SenderMessage::Port() const { return port_; }
std::string SenderMessage::ReactorName() const { return reactor_; }
std::string SenderMessage::ChannelName() const { return channel_; }
std::shared_ptr<Channel> SenderMessage::GetChannelToSender(
System *system) const {
if (address_ == system->network().Address() &&
port_ == system->network().Port()) {
return system->FindChannel(reactor_, channel_);
}
return system->network().Resolve(address_, port_, reactor_, channel_);
}

View File

@ -13,7 +13,23 @@
#include <tuple>
#include <unordered_map>
#include <gflags/gflags.h>
#include "protocol.hpp"
#include "cereal/archives/binary.hpp"
#include "cereal/types/base_class.hpp"
#include "cereal/types/memory.hpp"
#include "cereal/types/polymorphic.hpp"
#include "cereal/types/string.hpp"
#include "cereal/types/utility.hpp" // utility has to be included because of std::pair
#include "cereal/types/vector.hpp"
#include "communication/server.hpp"
#include "threading/sync/spinlock.hpp"
DECLARE_string(address);
DECLARE_int32(port);
class Message;
class EventStream;
@ -30,9 +46,9 @@ class Channel {
public:
virtual void Send(std::unique_ptr<Message>) = 0;
virtual std::string Hostname() = 0;
virtual std::string Address() = 0;
virtual int32_t Port() = 0;
virtual uint16_t Port() = 0;
virtual std::string ReactorName() = 0;
@ -42,7 +58,7 @@ class Channel {
template <class Archive>
void serialize(Archive &archive) {
archive(Hostname(), Port(), ReactorName(), Name());
archive(Address(), Port(), ReactorName(), Name());
}
};
@ -155,9 +171,9 @@ class Connector {
}
}
virtual std::string Hostname();
virtual std::string Address();
virtual int32_t Port();
virtual uint16_t Port();
virtual std::string ReactorName();
@ -355,45 +371,59 @@ class Reactor {
MsgAndCbInfo LockedGetPendingMessages(std::unique_lock<std::mutex> &lock);
};
/**
* Configuration service.
*/
class Config {
public:
Config(System *system) : system_(system) {}
std::string GetString(std::string key) {
// TODO: Use configuration lib.
assert(key == "hostname");
return "localhost";
}
int32_t GetInt(std::string key) {
// TODO: Use configuration lib.
assert(key == "port");
return 8080;
}
private:
System *system_;
};
/**
* Networking service.
*/
class Network {
private:
using Endpoint = Protocol::Endpoint;
using Socket = Protocol::Socket;
using NetworkServer = communication::Server<Protocol::Session,
Protocol::Socket, Protocol::Data>;
struct NetworkMessage {
NetworkMessage()
: address(""), port(0), reactor(""), channel(""), message(nullptr) {}
NetworkMessage(std::string _address, uint16_t _port, std::string _reactor,
std::string _channel, std::unique_ptr<Message> _message)
: address(_address),
port(_port),
reactor(_reactor),
channel(_channel),
message(std::move(_message)) {}
NetworkMessage(NetworkMessage &&nm)
: address(nm.address),
port(nm.port),
reactor(nm.reactor),
channel(nm.channel),
message(std::move(nm.message)) {}
std::string address;
uint16_t port;
std::string reactor;
std::string channel;
std::unique_ptr<Message> message;
};
public:
Network(System *system);
std::string Hostname() { return hostname_; }
// client functions
int32_t Port() { return port_; }
std::shared_ptr<Channel> Resolve(std::string hostname, int32_t port) {
// TODO: Synchronously resolve and return channel.
std::shared_ptr<Channel> Resolve(std::string address, uint16_t port,
std::string reactor_name,
std::string channel_name) {
if (Protocol::SendMessage(address, port, reactor_name, channel_name,
nullptr)) {
return std::make_shared<RemoteChannel>(this, address, port, reactor_name,
channel_name);
}
return nullptr;
}
std::shared_ptr<EventStream> AsyncResolve(std::string hostname, int32_t port,
std::shared_ptr<EventStream> AsyncResolve(std::string address, uint16_t port,
int32_t retries,
std::chrono::seconds cooldown) {
// TODO: Asynchronously resolve channel, and return an event stream
@ -401,31 +431,139 @@ class Network {
return nullptr;
}
void StartClient(int worker_count) {
LOG(INFO) << "Starting " << worker_count << " client workers";
for (int i = 0; i < worker_count; ++i) {
pool_.push_back(std::thread([worker_count, this]() {
while (this->client_run_) {
this->mutex_.lock();
if (!this->queue_.empty()) {
NetworkMessage nm(std::move(this->queue_.front()));
this->queue_.pop();
this->mutex_.unlock();
// TODO: store success
bool success =
Protocol::SendMessage(nm.address, nm.port, nm.reactor,
nm.channel, std::move(nm.message));
std::cout << "Network client message send status: " << success << std::endl;
} else {
this->mutex_.unlock();
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}));
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void StopClient() {
while (true) {
std::lock_guard<SpinLock> lock(mutex_);
if (queue_.empty()) {
break;
}
}
client_run_ = false;
for (int i = 0; i < pool_.size(); ++i) {
pool_[i].join();
}
}
class RemoteChannel : public Channel {
public:
RemoteChannel() {}
RemoteChannel(Network *network, std::string address, uint16_t port,
std::string reactor, std::string channel)
: network_(network),
address_(address),
port_(port),
reactor_(reactor),
channel_(channel) {}
virtual std::string Hostname() {
throw std::runtime_error("Unimplemented.");
}
virtual std::string Address() { return address_; }
virtual int32_t Port() { throw std::runtime_error("Unimplemented."); }
virtual uint16_t Port() { return port_; }
virtual std::string ReactorName() {
throw std::runtime_error("Unimplemented.");
}
virtual std::string ReactorName() { return reactor_; }
virtual std::string Name() { throw std::runtime_error("Unimplemented."); }
virtual std::string Name() { return channel_; }
virtual void Send(std::unique_ptr<Message> message) {
// TODO: Implement.
network_->mutex_.lock();
network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_,
std::move(message)));
network_->mutex_.unlock();
}
private:
Network *network_;
std::string address_;
uint16_t port_;
std::string reactor_;
std::string channel_;
};
// server functions
std::string Address() { return FLAGS_address; }
uint16_t Port() { return FLAGS_port; }
void StartServer(int workers_count) {
if (server_ != nullptr) {
LOG(FATAL) << "Tried to start a running server!";
}
// Initialize endpoint.
Endpoint endpoint;
try {
endpoint = Endpoint(FLAGS_address.c_str(), FLAGS_port);
} catch (io::network::NetworkEndpointException &e) {
LOG(FATAL) << e.what();
}
// Initialize socket.
Socket socket;
if (!socket.Bind(endpoint)) {
LOG(FATAL) << "Cannot bind to socket on " << FLAGS_address << " at "
<< FLAGS_port;
}
if (!socket.SetNonBlocking()) {
LOG(FATAL) << "Cannot set socket to non blocking!";
}
if (!socket.Listen(1024)) {
LOG(FATAL) << "Cannot listen on socket!";
}
// Initialize server
server_ =
std::make_unique<NetworkServer>(std::move(socket), protocol_data_);
// Start server
thread_ = std::thread(
[workers_count, this]() { this->server_->Start(workers_count); });
}
void StopServer() {
if (server_ != nullptr) {
server_->Shutdown();
server_ = nullptr;
}
thread_.join();
}
private:
System *system_;
std::string hostname_;
int32_t port_;
// client variables
SpinLock mutex_;
std::vector<std::thread> pool_;
std::queue<NetworkMessage> queue_;
std::atomic<bool> client_run_{true};
// server variables
std::thread thread_;
Protocol::Data protocol_data_;
std::unique_ptr<NetworkServer> server_{nullptr};
};
/**
@ -444,44 +582,29 @@ class Message {
*/
class SenderMessage : public Message {
public:
SenderMessage(std::shared_ptr<Channel> sender) : sender_(sender) {}
SenderMessage();
SenderMessage(std::string reactor, std::string channel);
std::shared_ptr<Channel> sender() { return sender_; }
std::string Address() const;
uint16_t Port() const;
std::string ReactorName() const;
std::string ChannelName() const;
std::shared_ptr<Channel> GetChannelToSender(System *system) const;
template <class Archive>
void serialize(Archive &ar) {
ar(sender_);
ar(cereal::virtual_base_class<Message>(this), address_, port_,
reactor_, channel_);
}
private:
std::shared_ptr<Channel> sender_;
};
/**
* Serialization service.
*/
class Serialization {
public:
using SerializedT = std::pair<char *, int64_t>;
Serialization(System *system) : system_(system) {}
SerializedT serialize(const Message &) {
SerializedT serialized;
throw std::runtime_error("Not yet implemented (Serialization::serialized)");
return serialized;
}
Message deserialize(const SerializedT &) {
Message message;
throw std::runtime_error(
"Not yet implemented (Serialization::deserialize)");
return message;
}
private:
System *system_;
std::string address_;
uint16_t port_;
std::string reactor_;
std::string channel_;
};
CEREAL_REGISTER_TYPE(SenderMessage);
/**
* Global placeholder for all reactors in the system. Alive through the entire process lifetime.
@ -492,10 +615,15 @@ class System {
public:
friend class Reactor;
System() : config_(this), network_(this), serialization_(this) {}
System() : network_(this) {}
void operator=(const System &) = delete;
void StartServices() {
network_.StartClient(4);
network_.StartServer(4);
}
template <class ReactorType, class... Args>
const std::shared_ptr<Channel> Spawn(const std::string &name,
Args &&... args) {
@ -525,14 +653,12 @@ class System {
auto &thread = key_value.second.second;
thread.join();
}
network_.StopClient();
network_.StopServer();
}
Config &config() { return config_; }
Network &network() { return network_; }
Serialization &serialization() { return serialization_; }
private:
void StartReactor(Reactor& reactor) {
current_reactor_ = &reactor;
@ -546,7 +672,5 @@ class System {
std::unordered_map<std::string,
std::pair<std::unique_ptr<Reactor>, std::thread>>
reactors_;
Config config_;
Network network_;
Serialization serialization_;
};

View File

@ -0,0 +1,165 @@
#include <sstream>
#include "protocol.hpp"
#include "communication.hpp"
#include "glog/logging.h"
namespace Protocol {
Session::Session(Socket &&socket, Data &data)
: socket_(std::move(socket)), system_(data.system) {
event_.data.ptr = this;
}
bool Session::Alive() const { return alive_; }
std::string Session::GetString(SizeT len) {
std::string ret(reinterpret_cast<char *>(buffer_.data()), len);
buffer_.Shift(len);
return ret;
}
void Session::Execute() {
if (!handshake_done_) {
SizeT len_reactor = GetLength();
SizeT len_channel = GetLength(2);
if (len_reactor == 0 || len_channel == 0) return;
if (buffer_.size() < len_reactor + len_channel) return;
// remove the length bytes from the buffer
buffer_.Shift(2 * sizeof(SizeT));
reactor_ = GetString(len_reactor);
channel_ = GetString(len_channel);
std::cout << "Reactor: " << reactor_ << "; Channel: " << channel_
<< std::endl;
auto channel = system_->FindChannel(reactor_, channel_);
SendSuccess(channel != nullptr);
handshake_done_ = true;
}
SizeT len_data = GetLength();
if (len_data == 0) return;
if (buffer_.size() < len_data) return;
// remove the length bytes from the buffer
buffer_.Shift(sizeof(SizeT));
// TODO: check for exceptions
std::istringstream stream;
stream.str(std::string(reinterpret_cast<char*>(buffer_.data()), len_data));
cereal::BinaryInputArchive iarchive{stream};
std::unique_ptr<Message> message{nullptr};
iarchive(message);
buffer_.Shift(len_data);
auto channel = system_->FindChannel(reactor_, channel_);
if (channel == nullptr) {
SendSuccess(false);
return;
}
channel->Send(std::move(message));
SendSuccess(true);
}
StreamBuffer Session::Allocate() { return buffer_.Allocate(); }
void Session::Written(size_t len) { buffer_.Written(len); }
void Session::Close() {
DLOG(INFO) << "Closing session";
this->socket_.Close();
}
SizeT Session::GetLength(int offset) {
if (buffer_.size() - offset < sizeof(SizeT)) return 0;
SizeT ret = *reinterpret_cast<SizeT *>(buffer_.data() + offset);
return ret;
}
bool Session::SendSuccess(bool success) {
if (success) {
return socket_.Write("\x80");
}
return socket_.Write("\x40");
}
bool SendLength(Socket &socket, SizeT length) {
return socket.Write(reinterpret_cast<uint8_t *>(&length), sizeof(SizeT));
}
bool GetSuccess(Socket &socket) {
uint8_t val;
if (socket.Read(&val, 1) != 1) {
return false;
}
return val == 0x80;
}
bool SendMessage(std::string address, uint16_t port, std::string reactor,
std::string channel, std::unique_ptr<Message> message) {
// Initialize endpoint.
Endpoint endpoint;
try {
endpoint = Endpoint(address.c_str(), port);
} catch (io::network::NetworkEndpointException &e) {
LOG(INFO) << "Address is invalid!";
return false;
}
// Initialize socket.
Socket socket;
if (!socket.Connect(endpoint)) {
LOG(INFO) << "Couldn't connect to remote address: " << address << ":"
<< port;
return false;
}
// Send data
if (!SendLength(socket, reactor.size())) {
LOG(INFO) << "Couldn't send reactor size!";
return false;
}
if (!SendLength(socket, channel.size())) {
LOG(INFO) << "Couldn't send channel size!";
return false;
}
if (!socket.Write(reactor)) {
LOG(INFO) << "Couldn't send reactor data!";
return false;
}
if (!socket.Write(channel)) {
LOG(INFO) << "Couldn't send channel data!";
return false;
}
bool success = GetSuccess(socket);
if (message == nullptr or !success) {
return success;
}
// Serialize and send message
std::ostringstream stream;
cereal::BinaryOutputArchive oarchive(stream);
oarchive(message);
const std::string &buffer = stream.str();
if (!SendLength(socket, buffer.size())) {
LOG(INFO) << "Couldn't send message size!";
return false;
}
if (!socket.Write(buffer.data(), buffer.size())) {
LOG(INFO) << "Couldn't send message data!";
return false;
}
return GetSuccess(socket);
}
}

View File

@ -0,0 +1,135 @@
#pragma once
#include "communication/bolt/v1/decoder/buffer.hpp"
#include "io/network/epoll.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/socket.hpp"
#include "io/network/stream_buffer.hpp"
class Message;
class System;
/**
* @brief Protocol
*
* Has classes and functions that implement server and client sides of our
* distributed protocol.
*
* The protocol consists of two stages.
* The first stage is a handshake stage when the client sends to the server
* reactor and channel names which it wants to communicate with.
* The second stage is sending messages.
*
* HANDSHAKE
*
* Client sends:
* len_reactor_name(SizeT) len_channel_name(SizeT) reactor_name channel_name
* Server responds:
* 0x40 if the reactor/channel combo doesn't exist
* 0x80 if the reactor/channel combo exists
*
* MESSAGES
*
* Client sends:
* len_message(SizeT) cereal_encoded_binary_message
* Server responds:
* 0x40 if the reactor/channel combo doesn't exist or the message wasn't
* successfully decoded and delivered
* 0x80 if the reactor/channel combo exist and the message was successfully
* decoded and delivered
*
* Currently the server is implemented to handle more than one message after
* the initial handshake, but the client can only send one message.
*/
namespace Protocol {
using Endpoint = io::network::NetworkEndpoint;
using Socket = io::network::Socket;
using StreamBuffer = io::network::StreamBuffer;
// this buffer should be larger than the largest serialized message
using Buffer = communication::bolt::Buffer<262144>;
using SizeT = uint16_t;
/**
* Distributed Protocol Data
*
* This class is responsible for holding a pointer to System.
*/
struct Data {
Data(System *_system) : system(_system) {}
System *system;
};
/**
* Distributed Protocol Session
*
* This class is responsible for handling a single client connection.
*
* @tparam Socket type of socket (could be a network socket or test socket)
*/
class Session {
private:
public:
Session(Socket &&socket, Data &data);
/**
* Returns the protocol alive state
*/
bool Alive() const;
/**
* Executes the protocol after data has been read into the buffer.
* Goes through the protocol states in order to execute commands from the
* client.
*/
void Execute();
/**
* Allocates data from the internal buffer.
* Used in the underlying network stack to asynchronously read data
* from the client.
* @returns a StreamBuffer to the allocated internal data buffer
*/
StreamBuffer Allocate();
/**
* Notifies the internal buffer of written data.
* Used in the underlying network stack to notify the internal buffer
* how many bytes of data have been written.
* @param len how many data was written to the buffer
*/
void Written(size_t len);
/**
* Closes the session (client socket).
*/
void Close();
io::network::Epoll::Event event_;
Socket socket_;
private:
SizeT GetLength(int offset = 0);
std::string GetString(SizeT len);
bool SendSuccess(bool success);
bool alive_{true};
bool handshake_done_{false};
std::string reactor_{""};
std::string channel_{""};
Buffer buffer_;
System *system_;
};
/**
* Distributed Protocol Send Message
*
* This function sends a message to the specified server.
* If message is a nullptr then it only checks whether the remote reactor
* and channel exist, else it returns the complete message send success.
*/
bool SendMessage(std::string address, uint16_t port, std::string reactor,
std::string channel, std::unique_ptr<Message> message);
}

View File

@ -0,0 +1,100 @@
#include "communication.hpp"
class ChatMessage : public SenderMessage {
public:
ChatMessage() : SenderMessage(), message_("") {}
ChatMessage(std::string reactor, std::string channel, std::string message)
: SenderMessage(reactor, channel), message_(message) {}
std::string Message() const { return message_; }
template <class Archive>
void serialize(Archive &ar) {
ar(cereal::base_class<SenderMessage>(this), message_);
}
private:
std::string message_;
};
CEREAL_REGISTER_TYPE(ChatMessage);
class ChatACK : public ChatMessage {
public:
ChatACK() : ChatMessage() {}
ChatACK(std::string reactor, std::string channel, std::string message)
: ChatMessage(reactor, channel, message) {}
template <class Archive>
void serialize(Archive &ar) {
ar(cereal::base_class<ChatMessage>(this));
}
};
CEREAL_REGISTER_TYPE(ChatACK);
class ChatServer : public Reactor {
public:
ChatServer(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::cout << "ChatServer is active" << std::endl;
auto chat = Open("chat").first;
while (true) {
auto m = chat->AwaitEvent();
if (ChatACK *ack = dynamic_cast<ChatACK *>(m.get())) {
std::cout << "Received ACK from " << ack->Address() << ":"
<< ack->Port() << " -> '" << ack->Message() << "'"
<< std::endl;
} else if (ChatMessage *msg = dynamic_cast<ChatMessage *>(m.get())) {
std::cout << "Received message from " << msg->Address() << ":"
<< msg->Port() << " -> '" << msg->Message() << "'"
<< std::endl;
auto channel = msg->GetChannelToSender(system_);
if (channel != nullptr) {
channel->Send(
std::make_unique<ChatACK>("server", "chat", msg->Message()));
}
} else {
std::cerr << "Unknown message received!\n";
exit(1);
}
}
}
};
class ChatClient : public Reactor {
public:
ChatClient(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::cout << "ChatClient is active" << std::endl;
std::string address, message;
uint16_t port;
while (true) {
std::cout << "Enter IP, port and message to send." << std::endl;
std::cin >> address >> port >> message;
auto channel =
system_->network().Resolve(address, port, "server", "chat");
if (channel != nullptr) {
channel->Send(std::make_unique<ChatMessage>("server", "chat", message));
} else {
std::cerr << "Couldn't resolve that server!" << std::endl;
}
}
}
};
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
System system;
system.StartServices();
system.Spawn<ChatServer>("server");
system.Spawn<ChatClient>("client");
system.AwaitShutdown();
return 0;
}

View File

@ -0,0 +1,15 @@
#include "communication.hpp"
int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
System system;
system.network().StartClient(1);
auto channel = system.network().Resolve("127.0.0.1", 10000, "master", "main");
std::cout << channel << std::endl;
if (channel != nullptr) {
auto message = std::make_unique<SenderMessage>("master", "main");
channel->Send(std::move(message));
}
system.network().StopClient();
return 0;
}