Callback interface consistency changes

Summary:
1. add a test for unsubscription service
2. change the callback to accept const subscription&
3. renamed Send -> SendHelper
4. created a more usable Send
5. changed the Send usages

Reviewers: buda, lion, sasa.stanko

Reviewed By: sasa.stanko

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D647
This commit is contained in:
Goran Zuzic 2017-08-08 17:22:08 +02:00
parent 54b7851f84
commit 045e14e139
7 changed files with 161 additions and 50 deletions

View File

@ -149,7 +149,7 @@ class Master : public Reactor {
}
}
stream->OnEvent(typeid(Message), [this](const Message &msg, EventStream::Subscription& subscription) {
stream->OnEvent(typeid(Message), [this](const Message &msg, const EventStream::Subscription& subscription) {
std::cout << "Processing Query via Callback" << std::endl;
const Query &query =
dynamic_cast<const Query &>(msg); // exception bad_cast
@ -178,12 +178,12 @@ class Master : public Reactor {
auto create_node_txn =
std::make_unique<CreateNodeTxn>("master", "main", xid);
channels_[worker_id]->Send(typeid(nullptr), std::move(create_node_txn));
channels_[worker_id]->SendHelper(typeid(nullptr), std::move(create_node_txn));
auto m = stream->AwaitEvent();
if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
req->GetChannelToSender(system_)->Send(typeid(nullptr), std::make_unique<CommitDirective>());
req->GetChannelToSender(system_)->Send<CommitDirective>();
} else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) {
req->GetChannelToSender(system_)->Send(typeid(nullptr), std::make_unique<AbortDirective>());
req->GetChannelToSender(system_)->Send<AbortDirective>();
} else {
std::cerr << "unknown message\n";
exit(1);
@ -197,8 +197,7 @@ class Master : public Reactor {
auto connector = Open(txn_channel_name);
auto stream = connector.first;
for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
channels_[w_id]->Send(typeid(nullptr),
std::make_unique<CountNodesTxn>("master", "main", xid));
channels_[w_id]->Send<CountNodesTxn>("master", "main", xid);
std::vector<std::shared_ptr<Channel>> txn_channels;
txn_channels.resize(NUM_WORKERS, nullptr);
@ -219,10 +218,10 @@ class Master : public Reactor {
if (commit) {
for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
txn_channels[w_id]->Send(typeid(nullptr), std::make_unique<CommitDirective>());
txn_channels[w_id]->Send<CommitDirective>();
} else {
for (int w_id = 0; w_id < NUM_WORKERS; ++w_id)
txn_channels[w_id]->Send(typeid(nullptr), std::make_unique<AbortDirective>());
txn_channels[w_id]->Send<AbortDirective>();
}
int64_t count = 0;
@ -307,7 +306,7 @@ class Worker : public Reactor {
auto stream = connector.first;
auto masterChannel = txn->GetChannelToSender(system_);
// TODO: Do the actual commit.
masterChannel->Send(typeid(nullptr),
masterChannel->SendHelper(typeid(nullptr),
std::make_unique<CommitRequest>("master", "main", worker_id_));
auto m = stream->AwaitEvent();
if (dynamic_cast<CommitDirective *>(m.get())) {
@ -329,11 +328,11 @@ class Worker : public Reactor {
// TODO: Fix this hack -- use the storage.
int num = 123;
masterChannel->Send(typeid(nullptr),
masterChannel->SendHelper(typeid(nullptr),
std::make_unique<CommitRequest>("master", "main", worker_id_));
auto m = stream->AwaitEvent();
if (dynamic_cast<CommitDirective *>(m.get())) {
masterChannel->Send(typeid(nullptr), std::make_unique<CountNodesTxnResult>(num));
masterChannel->SendHelper(typeid(nullptr), std::make_unique<CountNodesTxnResult>(num));
} else if (dynamic_cast<AbortDirective *>(m.get())) {
// send nothing
} else {
@ -371,9 +370,9 @@ void ClientMain(System *system) {
std::getline(std::cin, s);
if (s == "quit") {
active = false;
channel->Send(typeid(nullptr), std::make_unique<Quit>());
channel->SendHelper(typeid(nullptr), std::make_unique<Quit>());
} else {
channel->Send(typeid(nullptr), std::make_unique<Query>(s));
channel->SendHelper(typeid(nullptr), std::make_unique<Query>(s));
}
}
}

View File

@ -3,7 +3,7 @@
DEFINE_string(address, "127.0.0.1", "Network server bind address");
DEFINE_int32(port, 10000, "Network server bind port");
void EventStream::Subscription::unsubscribe() {
void EventStream::Subscription::unsubscribe() const {
event_queue_.RemoveCb(*this);
}

View File

@ -11,8 +11,9 @@
#include <stdexcept>
#include <thread>
#include <tuple>
#include <unordered_map>
#include <typeindex>
#include <unordered_map>
#include <utility>
#include <gflags/gflags.h>
@ -45,7 +46,18 @@ extern thread_local Reactor* current_reactor_;
*/
class Channel {
public:
virtual void Send(const std::type_index&, std::unique_ptr<Message>) = 0;
/**
* Construct and send the message to the channel.
*/
template<typename MsgType, typename... Args>
void Send(Args&&... args) {
SendHelper(typeid(MsgType), std::unique_ptr<Message>(new MsgType(std::forward<Args>(args)...)));
}
template<typename MsgType>
void Send(std::unique_ptr<MsgType>&& msg_ptr) {
SendHelper(typeid(MsgType), std::move(msg_ptr));
}
virtual std::string Address() = 0;
@ -61,6 +73,8 @@ class Channel {
void serialize(Archive &archive) {
archive(Address(), Port(), ReactorName(), Name());
}
virtual void SendHelper(const std::type_index&, std::unique_ptr<Message>) = 0;
};
/**
@ -90,7 +104,7 @@ class EventStream {
/**
* Unsubscribe. Call only once.
*/
void unsubscribe();
void unsubscribe() const;
private:
friend class Reactor;
@ -104,7 +118,7 @@ class EventStream {
uint64_t cb_uid_;
};
typedef std::function<void(const Message&, Subscription&)> Callback;
typedef std::function<void(const Message&, const Subscription&)> Callback;
/**
* Register a callback that will be called whenever an event arrives.
@ -166,7 +180,7 @@ class Connector {
weak_queue_(queue),
system_(system) {}
virtual void Send(const std::type_index& tidx, std::unique_ptr<Message> m) {
virtual void SendHelper(const std::type_index& tidx, std::unique_ptr<Message> m) {
std::shared_ptr<Connector> queue_ = weak_queue_.lock(); // Atomic, per the standard.
if (queue_) {
// We guarantee here that the Connector is not destroyed.
@ -494,7 +508,7 @@ class Network {
virtual std::string Name() { return channel_; }
virtual void Send(const std::type_index &tidx, std::unique_ptr<Message> message) {
virtual void SendHelper(const std::type_index &tidx, std::unique_ptr<Message> message) {
network_->mutex_.lock();
network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_,
std::move(message)));

View File

@ -64,7 +64,7 @@ void Session::Execute() {
return;
}
channel->Send(typeid(nullptr), std::move(message));
channel->SendHelper(typeid(nullptr), std::move(message));
SendSuccess(true);
}

View File

@ -48,8 +48,8 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) {
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
CloseConnector("main");
}
};
@ -59,8 +59,8 @@ TEST(ConnectorSetUpTest, CheckMainChannelIsSet) {
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("master", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
CloseConnector("main");
}
};
@ -83,8 +83,8 @@ TEST(SimpleSendTest, OneSimpleSend) {
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
channel->Send(typeid(nullptr), std::make_unique<MessageInt>(123));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(123);
CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII.
}
};
@ -107,6 +107,42 @@ TEST(SimpleSendTest, OneSimpleSend) {
system.AwaitShutdown();
}
TEST(SimpleSendTest, OneCallback) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(888);
CloseConnector("main");
}
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription&) {
const MessageInt& msgint = dynamic_cast<const MessageInt&>(msg);
ASSERT_EQ(msgint.x, 888);
CloseConnector("main");
});
}
};
System system;
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(SimpleSendTest, IgnoreAfterClose) {
struct MessageInt : public Message {
@ -119,12 +155,12 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
channel->Send(typeid(nullptr), std::make_unique<MessageInt>(101));
channel->Send(typeid(nullptr), std::make_unique<MessageInt>(102));
std::this_thread::sleep_for(std::chrono::seconds(1));
channel->Send(typeid(nullptr), std::make_unique<MessageInt>(103)); // these ones should be ignored
channel->Send(typeid(nullptr), std::make_unique<MessageInt>(104));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(101);
channel->Send<MessageInt>(102);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(103); // should be ignored
channel->Send<MessageInt>(104); // should be ignored
CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII.
}
};
@ -148,7 +184,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
}
TEST(SimpleSendTest, OnEvent) {
TEST(MultipleSendTest, UnsubscribeService) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
@ -158,19 +194,83 @@ TEST(SimpleSendTest, OnEvent) {
char x;
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageInt>(55);
channel->Send<MessageInt>(66);
channel->Send<MessageInt>(77);
channel->Send<MessageInt>(88);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send<MessageChar>('a');
channel->Send<MessageChar>('b');
channel->Send<MessageChar>('c');
channel->Send<MessageChar>('d');
CloseConnector("main");
}
};
struct Worker : public Reactor {
Worker(System *system, std::string name) : Reactor(system, name) {}
int num_msgs_received = 0;
virtual void Run() {
EventStream* stream = main_.first;
stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription& subscription) {
const MessageInt& msgint = dynamic_cast<const MessageInt&>(msg);
ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
++num_msgs_received;
if (msgint.x == 66) {
subscription.unsubscribe(); // receive only two of them
}
});
stream->OnEvent(typeid(MessageChar), [this](const Message& msg, const EventStream::Subscription& subscription) {
const MessageChar& msgchar = dynamic_cast<const MessageChar&>(msg);
char c = msgchar.x;
++num_msgs_received;
ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
if (num_msgs_received == 5) {
subscription.unsubscribe();
CloseConnector("main");
}
});
}
};
System system;
system.Spawn<Master>("master");
system.Spawn<Worker>("worker");
system.AwaitShutdown();
}
TEST(MultipleSendTest, OnEvent) {
struct MessageInt : public Message {
MessageInt(int xx) : x(xx) {}
int x;
};
struct MessageChar : public Message {
MessageChar(char xx) : x(xx) {}
char x;
};
struct Master : public Reactor {
Master(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
std::shared_ptr<Channel> channel;
while (!(channel = system_->FindChannel("worker", "main")))
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(300));
channel->Send(typeid(MessageInt), std::make_unique<MessageInt>(101));
channel->Send(typeid(MessageChar), std::make_unique<MessageChar>('a'));
channel->Send(typeid(MessageInt), std::make_unique<MessageInt>(103)); // these ones should be ignored
channel->Send(typeid(MessageChar), std::make_unique<MessageChar>('b'));
CloseConnector("main"); // Write-end doesn't need to be closed because it's in RAII.
channel->Send<MessageInt>(101);
channel->Send<MessageChar>('a');
channel->Send<MessageInt>(103);
channel->Send<MessageChar>('b');
CloseConnector("main");
}
};
@ -184,24 +284,22 @@ TEST(SimpleSendTest, OnEvent) {
EventStream* stream = main_.first;
correct_vals = 0;
stream->OnEvent(typeid(MessageInt), [this](const Message& msg, EventStream::Subscription& subscription) {
stream->OnEvent(typeid(MessageInt), [this](const Message& msg, const EventStream::Subscription&) {
const MessageInt& msgint = dynamic_cast<const MessageInt&>(msg);
std::cout << "msg has int " << msgint.x << std::endl;
ASSERT_TRUE(msgint.x == 101 || msgint.x == 103);
++correct_vals;
main_.second->Send(typeid(EndMessage), std::make_unique<EndMessage>());
main_.second->Send<EndMessage>();
});
stream->OnEvent(typeid(MessageChar), [this](const Message& msg, EventStream::Subscription& subscription) {
stream->OnEvent(typeid(MessageChar), [this](const Message& msg, const EventStream::Subscription&) {
const MessageChar& msgchar = dynamic_cast<const MessageChar&>(msg);
std::cout << "msg has char " << msgchar.x << std::endl;
ASSERT_TRUE(msgchar.x == 'a' || msgchar.x == 'b');
++correct_vals;
main_.second->Send(typeid(EndMessage), std::make_unique<EndMessage>());
main_.second->Send<EndMessage>();
});
stream->OnEvent(typeid(EndMessage), [this](const Message&, EventStream::Subscription&) {
stream->OnEvent(typeid(EndMessage), [this](const Message&, const EventStream::Subscription&) {
ASSERT_LE(correct_vals, 4);
if (correct_vals == 4) {
CloseConnector("main");

View File

@ -54,7 +54,7 @@ class ChatServer : public Reactor {
<< std::endl;
auto channel = msg->GetChannelToSender(system_);
if (channel != nullptr) {
channel->Send(typeid(nullptr),
channel->SendHelper(typeid(nullptr),
std::make_unique<ChatACK>("server", "chat", msg->Message()));
}
} else {
@ -81,7 +81,7 @@ class ChatClient : public Reactor {
auto channel =
system_->network().Resolve(address, port, "server", "chat");
if (channel != nullptr) {
channel->Send(typeid(nullptr), std::make_unique<ChatMessage>("server", "chat", message));
channel->SendHelper(typeid(nullptr), std::make_unique<ChatMessage>("server", "chat", message));
} else {
std::cerr << "Couldn't resolve that server!" << std::endl;
}

View File

@ -8,7 +8,7 @@ int main(int argc, char *argv[]) {
std::cout << channel << std::endl;
if (channel != nullptr) {
auto message = std::make_unique<SenderMessage>("master", "main");
channel->Send(typeid(SenderMessage), std::move(message));
channel->SendHelper(typeid(SenderMessage), std::move(message));
}
system.network().StopClient();
return 0;