Add typed version of pop/await event functions

Reviewers: zuza

Reviewed By: zuza

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D646
This commit is contained in:
Sasa Stanko 2017-08-08 16:27:52 +02:00
parent 6ce504349f
commit 54b7851f84
4 changed files with 16 additions and 14 deletions

View File

@ -133,7 +133,7 @@ class Master : public Reactor {
std::cout << "Master is active" << std::endl;
while (true) {
auto m = stream->AwaitEvent().second;
auto m = stream->AwaitEvent();
if (Query *query = dynamic_cast<Query *>(m.get())) {
ProcessQuery(query);
break; // process only the first query
@ -179,7 +179,7 @@ class Master : public Reactor {
auto create_node_txn =
std::make_unique<CreateNodeTxn>("master", "main", xid);
channels_[worker_id]->Send(typeid(nullptr), std::move(create_node_txn));
auto m = stream->AwaitEvent().second;
auto m = stream->AwaitEvent();
if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
req->GetChannelToSender(system_)->Send(typeid(nullptr), std::make_unique<CommitDirective>());
} else if (AbortRequest *req = dynamic_cast<AbortRequest *>(m.get())) {
@ -204,7 +204,7 @@ class Master : public Reactor {
txn_channels.resize(NUM_WORKERS, nullptr);
bool commit = true;
for (int responds = 0; responds < NUM_WORKERS; ++responds) {
auto m = stream->AwaitEvent().second;
auto m = stream->AwaitEvent();
if (CommitRequest *req = dynamic_cast<CommitRequest *>(m.get())) {
txn_channels[req->worker_id()] = req->GetChannelToSender(system_);
commit &= true;
@ -227,7 +227,7 @@ class Master : public Reactor {
int64_t count = 0;
for (int responds = 0; responds < NUM_WORKERS; ++responds) {
auto m = stream->AwaitEvent().second;
auto m = stream->AwaitEvent();
if (CountNodesTxnResult *cnt =
dynamic_cast<CountNodesTxnResult *>(m.get())) {
count += cnt->count();
@ -280,7 +280,7 @@ class Worker : public Reactor {
auto stream = main_.first;
FindMaster();
while (true) {
auto m = stream->AwaitEvent().second;
auto m = stream->AwaitEvent();
if (Txn *txn = dynamic_cast<Txn *>(m.get())) {
HandleTransaction(txn);
} else {
@ -309,7 +309,7 @@ class Worker : public Reactor {
// TODO: Do the actual commit.
masterChannel->Send(typeid(nullptr),
std::make_unique<CommitRequest>("master", "main", worker_id_));
auto m = stream->AwaitEvent().second;
auto m = stream->AwaitEvent();
if (dynamic_cast<CommitDirective *>(m.get())) {
// TODO: storage_.CreateNode();
} else if (dynamic_cast<AbortDirective *>(m.get())) {
@ -331,7 +331,7 @@ class Worker : public Reactor {
masterChannel->Send(typeid(nullptr),
std::make_unique<CommitRequest>("master", "main", worker_id_));
auto m = stream->AwaitEvent().second;
auto m = stream->AwaitEvent();
if (dynamic_cast<CommitDirective *>(m.get())) {
masterChannel->Send(typeid(nullptr), std::make_unique<CountNodesTxnResult>(num));
} else if (dynamic_cast<AbortDirective *>(m.get())) {

View File

@ -71,12 +71,14 @@ class EventStream {
/**
* Blocks until a message arrives.
*/
virtual std::pair<std::type_index, std::unique_ptr<Message>> AwaitEvent() = 0;
virtual std::pair<std::type_index, std::unique_ptr<Message>> AwaitTypedEvent() = 0;
std::unique_ptr<Message> AwaitEvent() { return AwaitTypedEvent().second; }
/**
* Polls if there is a message available, returning null if there is none.
*/
virtual std::pair<std::type_index, std::unique_ptr<Message>> PopEvent() = 0;
virtual std::pair<std::type_index, std::unique_ptr<Message>> PopTypedEvent() = 0;
std::unique_ptr<Message> PopEvent() { return PopTypedEvent().second; }
/**
* Subscription Service.
@ -200,11 +202,11 @@ class Connector {
LocalEventStream(std::shared_ptr<std::mutex> mutex, std::string connector_name,
Connector *queue) : mutex_(mutex), connector_name_(connector_name), queue_(queue) {}
std::pair<std::type_index, std::unique_ptr<Message>> AwaitEvent() {
std::pair<std::type_index, std::unique_ptr<Message>> AwaitTypedEvent() {
std::unique_lock<std::mutex> lock(*mutex_);
return queue_->LockedAwaitPop(lock);
}
std::pair<std::type_index, std::unique_ptr<Message>> PopEvent() {
std::pair<std::type_index, std::unique_ptr<Message>> PopTypedEvent() {
std::unique_lock<std::mutex> lock(*mutex_);
return queue_->LockedPop();
}

View File

@ -93,7 +93,7 @@ TEST(SimpleSendTest, OneSimpleSend) {
Worker(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
EventStream* stream = main_.first;
std::unique_ptr<Message> m_uptr = stream->AwaitEvent().second;
std::unique_ptr<Message> m_uptr = stream->AwaitEvent();
CloseConnector("main");
MessageInt* msg = dynamic_cast<MessageInt *>(m_uptr.get());
ASSERT_NE(msg, nullptr);
@ -133,7 +133,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
Worker(System *system, std::string name) : Reactor(system, name) {}
virtual void Run() {
EventStream* stream = main_.first;
std::unique_ptr<Message> m_uptr = stream->AwaitEvent().second;
std::unique_ptr<Message> m_uptr = stream->AwaitEvent();
CloseConnector("main");
MessageInt* msg = dynamic_cast<MessageInt *>(m_uptr.get());
ASSERT_NE(msg, nullptr);

View File

@ -43,7 +43,7 @@ class ChatServer : public Reactor {
auto chat = Open("chat").first;
while (true) {
auto m = chat->AwaitEvent().second;
auto m = chat->AwaitEvent();
if (ChatACK *ack = dynamic_cast<ChatACK *>(m.get())) {
std::cout << "Received ACK from " << ack->Address() << ":"
<< ack->Port() << " -> '" << ack->Message() << "'"