Fix FindChannel implementation
Summary: Change queue implementation in distributed reactor Fix FindChannel implementations Reviewers: mtomic, buda, dgleich Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D986
This commit is contained in:
parent
6d78873ace
commit
2d6675df63
@ -39,45 +39,33 @@ RaftMember::RaftMember(System &system, const std::string &id,
|
||||
}
|
||||
}
|
||||
},
|
||||
true) {
|
||||
system.Spawn(id, [this](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
true),
|
||||
reactor_(system.Spawn(id, [this](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
stream->OnEvent<MLeaderTimeout>([this](
|
||||
const MLeaderTimeout &, const Subscription &) { RunElection(); });
|
||||
stream->OnEvent<MLeaderTimeout>([this](
|
||||
const MLeaderTimeout &, const Subscription &) { RunElection(); });
|
||||
|
||||
stream->OnEvent<MRequestVote>(
|
||||
[this](const MRequestVote &req, const Subscription &) {
|
||||
network_.RequestVoteReply(req.sender_id, OnRequestVote(req));
|
||||
});
|
||||
stream->OnEvent<MRequestVoteReply>(
|
||||
[this](const MRequestVoteReply &req, const Subscription &) {
|
||||
OnRequestVoteReply(req);
|
||||
});
|
||||
stream->OnEvent<MRequestVote>(
|
||||
[this](const MRequestVote &req, const Subscription &) {
|
||||
network_.RequestVoteReply(req.sender_id, OnRequestVote(req));
|
||||
});
|
||||
stream->OnEvent<MRequestVoteReply>(
|
||||
[this](const MRequestVoteReply &req, const Subscription &) {
|
||||
OnRequestVoteReply(req);
|
||||
});
|
||||
|
||||
stream->OnEvent<MAppendEntries>(
|
||||
[this](const MAppendEntries &req, const Subscription &) {
|
||||
network_.AppendEntriesReply(req.sender_id, OnAppendEntries(req));
|
||||
});
|
||||
stream->OnEvent<MAppendEntriesReply>(
|
||||
[this](const MAppendEntriesReply &rep, const Subscription &) {
|
||||
OnAppendEntriesReply(rep);
|
||||
});
|
||||
stream->OnEvent<MAppendEntries>(
|
||||
[this](const MAppendEntries &req, const Subscription &) {
|
||||
network_.AppendEntriesReply(req.sender_id, OnAppendEntries(req));
|
||||
});
|
||||
stream->OnEvent<MAppendEntriesReply>(
|
||||
[this](const MAppendEntriesReply &rep, const Subscription &) {
|
||||
OnAppendEntriesReply(rep);
|
||||
});
|
||||
})) {}
|
||||
|
||||
stream->OnEvent<MShutdown>([&r](const MShutdown &, const Subscription &) {
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
RaftMember::~RaftMember() {
|
||||
LogInfo("Shutting down...");
|
||||
|
||||
auto channel = system_.FindChannel(id_, "main");
|
||||
if (channel) {
|
||||
channel->Send<MShutdown>();
|
||||
}
|
||||
}
|
||||
RaftMember::~RaftMember() { LogInfo("Shutting down..."); }
|
||||
|
||||
template <class... Args>
|
||||
void RaftMember::LogInfo(const std::string &format, Args &&... args) {
|
||||
|
@ -49,6 +49,8 @@ class RaftMember {
|
||||
Watchdog leader_watchdog_;
|
||||
Watchdog heartbeat_watchdog_;
|
||||
|
||||
std::unique_ptr<reactor::Reactor> reactor_;
|
||||
|
||||
void RunElection();
|
||||
void TransitionToFollower();
|
||||
void TransitionToCandidate();
|
||||
|
@ -7,7 +7,6 @@
|
||||
namespace communication::raft {
|
||||
|
||||
struct MLeaderTimeout : public communication::reactor::Message {};
|
||||
struct MShutdown : public communication::reactor::Message {};
|
||||
|
||||
struct RaftMessage : public communication::reactor::Message {
|
||||
RaftMessage(int term, const std::string &sender_id)
|
||||
@ -98,7 +97,8 @@ class LocalReactorNetworkInterface : public RaftNetworkInterface {
|
||||
|
||||
class FakeNetworkInterface : public RaftNetworkInterface {
|
||||
public:
|
||||
explicit FakeNetworkInterface(communication::reactor::System &system) : system_(system) {}
|
||||
explicit FakeNetworkInterface(communication::reactor::System &system)
|
||||
: system_(system) {}
|
||||
|
||||
bool RequestVote(const std::string &recipient,
|
||||
const MRequestVote &msg) override {
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "communication/reactor/reactor_local.hpp"
|
||||
#include "data_structures/queue.hpp"
|
||||
#include "protocol.hpp"
|
||||
|
||||
#include "cereal/archives/binary.hpp"
|
||||
@ -70,46 +71,21 @@ class Network {
|
||||
public:
|
||||
Network() = default;
|
||||
|
||||
// client functions
|
||||
|
||||
std::shared_ptr<ChannelWriter> Resolve(std::string address, uint16_t port,
|
||||
std::string reactor_name,
|
||||
std::string channel_name) {
|
||||
if (SendMessage(address, port, reactor_name, channel_name, nullptr)) {
|
||||
return std::make_shared<RemoteChannelWriter>(this, address, port,
|
||||
reactor_name, channel_name);
|
||||
}
|
||||
LOG(WARNING) << "Could not resolve " << address << ":" << port << " "
|
||||
<< reactor_name << "/" << channel_name;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/** Start a threadpool that dispatches the messages from the (outgoing) queue
|
||||
* to the sockets */
|
||||
void StartClient(int worker_count) {
|
||||
LOG(INFO) << "Starting " << worker_count << " client workers";
|
||||
client_run_ = true;
|
||||
|
||||
// condition variables here...
|
||||
for (int i = 0; i < worker_count; ++i) {
|
||||
pool_.push_back(std::thread([this]() {
|
||||
while (this->client_run_) {
|
||||
this->mutex_.lock();
|
||||
if (!this->queue_.empty()) {
|
||||
NetworkMessage nm(std::move(this->queue_.front()));
|
||||
this->queue_.pop();
|
||||
this->mutex_.unlock();
|
||||
// TODO: store success
|
||||
bool success = SendMessage(nm.address, nm.port, nm.reactor,
|
||||
nm.channel, std::move(nm.message));
|
||||
DLOG(INFO) << "Network client message send status: " << success
|
||||
<< std::endl;
|
||||
} else {
|
||||
this->mutex_.unlock();
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
while (true) {
|
||||
auto message = queue_.AwaitPop();
|
||||
if (message == std::experimental::nullopt) break;
|
||||
SendMessage(message->address, message->port, message->reactor,
|
||||
message->channel, std::move(message->message));
|
||||
}
|
||||
}));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
}
|
||||
}
|
||||
|
||||
@ -120,7 +96,7 @@ class Network {
|
||||
break;
|
||||
}
|
||||
}
|
||||
client_run_ = false;
|
||||
queue_.Signal();
|
||||
for (size_t i = 0; i < pool_.size(); ++i) {
|
||||
pool_[i].join();
|
||||
}
|
||||
@ -148,8 +124,8 @@ class Network {
|
||||
|
||||
void Send(std::unique_ptr<Message> message) override {
|
||||
std::lock_guard<SpinLock> lock(network_->mutex_);
|
||||
network_->queue_.push(NetworkMessage(address_, port_, reactor_, channel_,
|
||||
std::move(message)));
|
||||
network_->queue_.Emplace(address_, port_, reactor_, channel_,
|
||||
std::move(message));
|
||||
}
|
||||
|
||||
private:
|
||||
@ -200,8 +176,7 @@ class Network {
|
||||
// client variables
|
||||
SpinLock mutex_;
|
||||
std::vector<std::thread> pool_;
|
||||
std::queue<NetworkMessage> queue_;
|
||||
std::atomic<bool> client_run_;
|
||||
Queue<NetworkMessage> queue_;
|
||||
|
||||
// server variables
|
||||
std::thread thread_;
|
||||
@ -221,15 +196,15 @@ class DistributedSystem : public ChannelFinder {
|
||||
}
|
||||
|
||||
// Thread safe.
|
||||
void Spawn(const std::string &name, std::function<void(Reactor &)> setup) {
|
||||
system_.Spawn(name, setup, this);
|
||||
std::unique_ptr<Reactor> Spawn(const std::string &name,
|
||||
std::function<void(Reactor &)> setup) {
|
||||
return system_.Spawn(name, setup, this);
|
||||
}
|
||||
|
||||
// Non-thread safe.
|
||||
// TODO: figure out what should be intereaction of this function and
|
||||
// TODO: figure out what should be interection of this function and
|
||||
// destructor.
|
||||
void StopServices() {
|
||||
system_.AwaitShutdown();
|
||||
network_.StopClient();
|
||||
network_.StopServer();
|
||||
}
|
||||
@ -250,14 +225,8 @@ class DistributedSystem : public ChannelFinder {
|
||||
const std::string &address, uint16_t port,
|
||||
const std::string &reactor_name,
|
||||
const std::string &channel_name) override {
|
||||
// Yeah... Unneeded shared ptr... once again. We love that.
|
||||
std::shared_ptr<ChannelWriter> channel_writer = nullptr;
|
||||
// TODO: Check if this is actually local channel.
|
||||
while (!(channel_writer =
|
||||
network_.Resolve(address, port, reactor_name, channel_name))) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
}
|
||||
return channel_writer;
|
||||
return std::make_shared<Network::RemoteChannelWriter>(
|
||||
&network_, address, port, reactor_name, channel_name);
|
||||
}
|
||||
|
||||
Network &network() { return network_; }
|
||||
|
@ -1,9 +1,13 @@
|
||||
#include "communication/reactor/reactor_local.hpp"
|
||||
|
||||
#include <chrono>
|
||||
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
namespace communication::reactor {
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
void EventStream::Subscription::Unsubscribe() const {
|
||||
event_queue_.RemoveCallback(*this);
|
||||
}
|
||||
@ -18,13 +22,59 @@ std::string Channel::LocalChannelWriter::ReactorName() const {
|
||||
return reactor_name_;
|
||||
}
|
||||
|
||||
void Channel::LocalChannelWriter::Send(std::unique_ptr<Message> m) {
|
||||
// Atomic, per the standard. We guarantee here that if channel exists it
|
||||
// will not be destroyed by the end of this function.
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
std::shared_ptr<Channel> queue = queue_.lock();
|
||||
// Check if cached queue exists and send message.
|
||||
if (queue) {
|
||||
queue->Push(std::move(m));
|
||||
break;
|
||||
}
|
||||
// If it doesn't exist. Check if there is a new channel with same name.
|
||||
auto new_channel = system_.FindChannel(reactor_name_, channel_name_);
|
||||
auto t =
|
||||
std::dynamic_pointer_cast<Channel::LocalChannelWriter>(new_channel);
|
||||
CHECK(t) << "t is of unexpected type";
|
||||
queue_ = t->queue_;
|
||||
}
|
||||
}
|
||||
|
||||
std::string Channel::LocalChannelWriter::Name() const { return channel_name_; }
|
||||
|
||||
void Channel::Close() {
|
||||
// TODO(zuza): there will be major problems if a reactor tries to close a
|
||||
// stream that isn't theirs luckily this should never happen if the framework
|
||||
// is used as expected.
|
||||
reactor_.CloseChannel(channel_name_);
|
||||
std::shared_ptr<Channel::LocalChannelWriter> Channel::LockedOpenChannel() {
|
||||
// TODO(zuza): fix this CHECK using this answer
|
||||
// https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
|
||||
// TODO: figure out zuza's TODO. Does that mean this CHECK is kind of flaky
|
||||
// or that it doesn't fail sometimes, when it should.
|
||||
CHECK(!self_ptr_.expired());
|
||||
return std::make_shared<LocalChannelWriter>(reactor_name_, channel_name_,
|
||||
self_ptr_, reactor_.system_);
|
||||
}
|
||||
|
||||
void Channel::Close() { reactor_.CloseChannel(channel_name_); }
|
||||
|
||||
Reactor::Reactor(ChannelFinder &system, const std::string &name,
|
||||
const std::function<void(Reactor &)> &setup, System &system2)
|
||||
: system_(system),
|
||||
system2_(system2),
|
||||
name_(name),
|
||||
setup_(setup),
|
||||
main_(Open("main")),
|
||||
thread_([this] {
|
||||
setup_(*this);
|
||||
RunEventLoop();
|
||||
system2_.RemoveReactor(name_);
|
||||
}) {}
|
||||
|
||||
Reactor::~Reactor() {
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(*mutex_);
|
||||
channels_.clear();
|
||||
}
|
||||
cvar_->notify_all();
|
||||
thread_.join();
|
||||
}
|
||||
|
||||
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Reactor::Open(
|
||||
@ -77,29 +127,18 @@ void Reactor::CloseChannel(const std::string &s) {
|
||||
}
|
||||
|
||||
void Reactor::RunEventLoop() {
|
||||
bool exit_event_loop = false;
|
||||
|
||||
while (true) {
|
||||
// Find (or wait) for the next Message.
|
||||
PendingMessageInfo info;
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(*mutex_);
|
||||
|
||||
while (true) {
|
||||
// Not fair because was taken earlier, talk to lion.
|
||||
// Exit the loop if there are no more Channels.
|
||||
cvar_->wait_for(guard, 200ms, [&] {
|
||||
if (channels_.empty()) return true;
|
||||
info = GetPendingMessages();
|
||||
if (info.message != nullptr) break;
|
||||
|
||||
// Exit the loop if there are no more Channels.
|
||||
if (channels_.empty()) {
|
||||
exit_event_loop = true;
|
||||
break;
|
||||
}
|
||||
|
||||
cvar_->wait(guard);
|
||||
}
|
||||
|
||||
if (exit_event_loop) break;
|
||||
return static_cast<bool>(info.message);
|
||||
});
|
||||
if (channels_.empty()) break;
|
||||
}
|
||||
|
||||
for (auto &callback_info : info.callbacks) {
|
||||
|
@ -289,6 +289,8 @@ class Channel {
|
||||
* Messages sent to a closed channel are ignored.
|
||||
* There can be multiple LocalChannelWriters refering to the same stream if
|
||||
* needed.
|
||||
*
|
||||
* It must outlive System.
|
||||
*/
|
||||
class LocalChannelWriter : public ChannelWriter {
|
||||
public:
|
||||
@ -296,22 +298,14 @@ class Channel {
|
||||
|
||||
LocalChannelWriter(const std::string &reactor_name,
|
||||
const std::string &channel_name,
|
||||
const std::weak_ptr<Channel> &queue)
|
||||
const std::weak_ptr<Channel> &queue,
|
||||
ChannelFinder &system)
|
||||
: reactor_name_(reactor_name),
|
||||
channel_name_(channel_name),
|
||||
queue_(queue) {}
|
||||
|
||||
void Send(std::unique_ptr<Message> m) override {
|
||||
// Atomic, per the standard. We guarantee here that if channel exists it
|
||||
// will not be destroyed by the end of this function.
|
||||
std::shared_ptr<Channel> queue = queue_.lock();
|
||||
if (queue) {
|
||||
queue->Push(std::move(m));
|
||||
}
|
||||
// TODO: what should we do here? Channel doesn't exist so message will be
|
||||
// lost.
|
||||
}
|
||||
queue_(queue),
|
||||
system_(system) {}
|
||||
|
||||
void Send(std::unique_ptr<Message> m) override;
|
||||
std::string ReactorName() const override;
|
||||
std::string Name() const override;
|
||||
|
||||
@ -319,6 +313,7 @@ class Channel {
|
||||
std::string reactor_name_;
|
||||
std::string channel_name_;
|
||||
std::weak_ptr<Channel> queue_;
|
||||
ChannelFinder &system_;
|
||||
};
|
||||
|
||||
/**
|
||||
@ -381,16 +376,7 @@ class Channel {
|
||||
cvar_->notify_one();
|
||||
}
|
||||
|
||||
std::shared_ptr<LocalChannelWriter> LockedOpenChannel() {
|
||||
// TODO(zuza): fix this CHECK using this answer
|
||||
// https://stackoverflow.com/questions/45507041/how-to-check-if-weak-ptr-is-empty-non-assigned
|
||||
// TODO: figure out zuza's TODO. Does that mean this CHECK is kind of flaky
|
||||
// or that it doesn't fail sometimes, when it should.
|
||||
CHECK(!self_ptr_.expired());
|
||||
return std::make_shared<LocalChannelWriter>(reactor_name_, channel_name_,
|
||||
self_ptr_);
|
||||
}
|
||||
|
||||
std::shared_ptr<LocalChannelWriter> LockedOpenChannel();
|
||||
std::unique_ptr<Message> LockedPop() { return LockedRawPop(); }
|
||||
|
||||
void LockedOnEventHelper(std::type_index type_index,
|
||||
@ -444,12 +430,10 @@ class Channel {
|
||||
class Reactor {
|
||||
friend class System;
|
||||
|
||||
Reactor(ChannelFinder &system, const std::string &name,
|
||||
const std::function<void(Reactor &)> &setup)
|
||||
: system_(system), name_(name), setup_(setup), main_(Open("main")) {}
|
||||
|
||||
public:
|
||||
~Reactor() {}
|
||||
Reactor(ChannelFinder &system, const std::string &name,
|
||||
const std::function<void(Reactor &)> &setup, System &system2);
|
||||
~Reactor();
|
||||
|
||||
std::pair<EventStream *, std::shared_ptr<ChannelWriter>> Open(
|
||||
const std::string &s);
|
||||
@ -474,6 +458,7 @@ class Reactor {
|
||||
Reactor &operator=(Reactor &&other) = default;
|
||||
|
||||
ChannelFinder &system_;
|
||||
System &system2_;
|
||||
std::string name_;
|
||||
std::function<void(Reactor &)> setup_;
|
||||
|
||||
@ -502,6 +487,8 @@ class Reactor {
|
||||
callbacks;
|
||||
};
|
||||
|
||||
std::thread thread_;
|
||||
|
||||
/**
|
||||
* Dispatches all waiting messages to callbacks. Shuts down when there are no
|
||||
* callbacks left.
|
||||
@ -520,21 +507,18 @@ class System : public ChannelFinder {
|
||||
friend class Reactor;
|
||||
System() = default;
|
||||
|
||||
void Spawn(const std::string &name, std::function<void(Reactor &)> setup,
|
||||
ChannelFinder *finder = nullptr) {
|
||||
std::unique_ptr<Reactor> Spawn(const std::string &name,
|
||||
std::function<void(Reactor &)> setup,
|
||||
ChannelFinder *finder = nullptr) {
|
||||
if (!finder) {
|
||||
finder = this;
|
||||
}
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
CHECK(reactors_.find(name) == reactors_.end())
|
||||
<< "Reactor with name: '" << name << "' already exists.";
|
||||
std::unique_ptr<Reactor> reactor(new Reactor(*finder, name, setup));
|
||||
std::thread reactor_thread([reactor = reactor.get()] {
|
||||
reactor->setup_(*reactor);
|
||||
reactor->RunEventLoop();
|
||||
});
|
||||
reactors_.emplace(name, std::pair<decltype(reactor), std::thread>{
|
||||
std::move(reactor), std::move(reactor_thread)});
|
||||
auto reactor = std::make_unique<Reactor>(*finder, name, setup, *this);
|
||||
reactors_.emplace(name, reactor.get());
|
||||
return reactor;
|
||||
}
|
||||
|
||||
std::shared_ptr<ChannelWriter> FindChannel(
|
||||
@ -542,8 +526,10 @@ class System : public ChannelFinder {
|
||||
const std::string &channel_name) override {
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
auto it_reactor = reactors_.find(reactor_name);
|
||||
if (it_reactor == reactors_.end()) return nullptr;
|
||||
return it_reactor->second.first->FindChannel(channel_name);
|
||||
if (it_reactor == reactors_.end())
|
||||
return std::shared_ptr<ChannelWriter>(new Channel::LocalChannelWriter(
|
||||
reactor_name, channel_name, {}, *this));
|
||||
return it_reactor->second->FindChannel(channel_name);
|
||||
}
|
||||
|
||||
std::shared_ptr<ChannelWriter> FindChannel(const std::string &, uint16_t,
|
||||
@ -556,16 +542,11 @@ class System : public ChannelFinder {
|
||||
LOG(FATAL) << "Tried to resolve remote channel in local System";
|
||||
}
|
||||
|
||||
// TODO: Think about interaction with destructor. Should we call this in
|
||||
// destructor, complain in destructor if there are alive threads or stop
|
||||
// them
|
||||
// in some way.
|
||||
void AwaitShutdown() {
|
||||
for (auto &key_value : reactors_) {
|
||||
auto &thread = key_value.second.second;
|
||||
thread.join();
|
||||
}
|
||||
reactors_.clear();
|
||||
void RemoveReactor(const std::string &name_) {
|
||||
std::unique_lock<std::mutex> guard(mutex_);
|
||||
auto it = reactors_.find(name_);
|
||||
CHECK(it != reactors_.end()) << "Trying to delete notexisting reactor";
|
||||
reactors_.erase(it);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -575,9 +556,7 @@ class System : public ChannelFinder {
|
||||
System &operator=(System &&) = delete;
|
||||
|
||||
std::mutex mutex_;
|
||||
std::unordered_map<std::string,
|
||||
std::pair<std::unique_ptr<Reactor>, std::thread>>
|
||||
reactors_;
|
||||
std::unordered_map<std::string, Reactor *> reactors_;
|
||||
};
|
||||
|
||||
using Subscription = Channel::LocalEventStream::Subscription;
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <cstdint>
|
||||
#include <experimental/optional>
|
||||
@ -17,6 +18,8 @@ class Queue {
|
||||
Queue(Queue &&) = delete;
|
||||
Queue &operator=(Queue &&) = delete;
|
||||
|
||||
~Queue() { Signal(); }
|
||||
|
||||
void Push(T x) {
|
||||
std::unique_lock<std::mutex> guard(mutex_);
|
||||
queue_.emplace(std::move(x));
|
||||
@ -42,23 +45,35 @@ class Queue {
|
||||
return queue_.empty();
|
||||
}
|
||||
|
||||
T AwaitPop() {
|
||||
// Block until there is an element in the queue and then pop it from the queue
|
||||
// and return it. Function can return nullopt only if Queue is signaled via
|
||||
// Signal function.
|
||||
std::experimental::optional<T> AwaitPop() {
|
||||
std::unique_lock<std::mutex> guard(mutex_);
|
||||
cvar_.wait(guard, [this]() { return !queue_.empty(); });
|
||||
auto x = std::move(queue_.front());
|
||||
cvar_.wait(guard, [this] { return !queue_.empty() || signaled_; });
|
||||
if (queue_.empty()) return std::experimental::nullopt;
|
||||
std::experimental::optional<T> x(std::move(queue_.front()));
|
||||
queue_.pop();
|
||||
return x;
|
||||
}
|
||||
|
||||
// Nonblocking version of above function.
|
||||
std::experimental::optional<T> MaybePop() {
|
||||
std::unique_lock<std::mutex> guard(mutex_);
|
||||
if (queue_.empty()) return std::experimental::nullopt;
|
||||
auto x = std::move(queue_.front());
|
||||
std::experimental::optional<T> x(std::move(queue_.front()));
|
||||
queue_.pop();
|
||||
return x;
|
||||
}
|
||||
|
||||
// Notify all threads waiting on conditional variable to stop waiting.
|
||||
void Signal() {
|
||||
signaled_ = true;
|
||||
cvar_.notify_all();
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<bool> signaled_{false};
|
||||
std::queue<T> queue_;
|
||||
std::condition_variable cvar_;
|
||||
mutable std::mutex mutex_;
|
||||
|
@ -51,8 +51,6 @@ milliseconds InitialElection(const RaftConfig &config) {
|
||||
end = std::chrono::system_clock::now();
|
||||
}
|
||||
|
||||
sys.AwaitShutdown();
|
||||
|
||||
return std::chrono::duration_cast<milliseconds>(end - start);
|
||||
}
|
||||
|
||||
@ -104,8 +102,6 @@ milliseconds Reelection(const RaftConfig &config) {
|
||||
end = std::chrono::system_clock::now();
|
||||
}
|
||||
|
||||
sys.AwaitShutdown();
|
||||
|
||||
return std::chrono::duration_cast<milliseconds>(end - start);
|
||||
}
|
||||
|
||||
@ -125,7 +121,7 @@ std::vector<milliseconds> RunTest(const std::string &name,
|
||||
return results;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
int main(int, char *argv[]) {
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
RaftConfig config{{"a", "b", "c", "d", "e"}, 150ms, 300ms, 70ms};
|
||||
|
@ -78,11 +78,18 @@ TEST(Queue, AwaitPop) {
|
||||
q.Push(4);
|
||||
});
|
||||
|
||||
EXPECT_EQ(q.AwaitPop(), 1);
|
||||
EXPECT_EQ(q.AwaitPop(), 2);
|
||||
EXPECT_EQ(q.AwaitPop(), 3);
|
||||
EXPECT_EQ(q.AwaitPop(), 4);
|
||||
EXPECT_EQ(*q.AwaitPop(), 1);
|
||||
EXPECT_EQ(*q.AwaitPop(), 2);
|
||||
EXPECT_EQ(*q.AwaitPop(), 3);
|
||||
EXPECT_EQ(*q.AwaitPop(), 4);
|
||||
t.join();
|
||||
|
||||
std::thread t2([&] {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
q.Signal();
|
||||
});
|
||||
EXPECT_EQ(q.AwaitPop(), std::experimental::nullopt);
|
||||
t2.join();
|
||||
}
|
||||
|
||||
TEST(Queue, Concurrent) {
|
||||
@ -112,7 +119,7 @@ TEST(Queue, Concurrent) {
|
||||
while (true) {
|
||||
int count = num_retrieved++;
|
||||
if (count >= kNumProducers * kNumElementsPerProducer) break;
|
||||
retrieved[thread_id].push_back(q.AwaitPop());
|
||||
retrieved[thread_id].push_back(*q.AwaitPop());
|
||||
}
|
||||
},
|
||||
i);
|
||||
|
@ -27,8 +27,8 @@ TEST(Raft, InitialElection) {
|
||||
{
|
||||
std::vector<std::unique_ptr<RaftMemberTest>> members;
|
||||
for (const auto &member_id : test_config.members) {
|
||||
members.push_back(
|
||||
std::make_unique<RaftMemberTest>(sys, member_id, test_config, network));
|
||||
members.push_back(std::make_unique<RaftMemberTest>(sys, member_id,
|
||||
test_config, network));
|
||||
network.Connect(member_id);
|
||||
}
|
||||
|
||||
@ -39,7 +39,6 @@ TEST(Raft, InitialElection) {
|
||||
EXPECT_EQ(member->Leader(), leader);
|
||||
}
|
||||
}
|
||||
sys.AwaitShutdown();
|
||||
}
|
||||
|
||||
TEST(Raft, Reelection) {
|
||||
@ -49,8 +48,8 @@ TEST(Raft, Reelection) {
|
||||
{
|
||||
std::vector<std::unique_ptr<RaftMemberTest>> members;
|
||||
for (const auto &member_id : test_config.members) {
|
||||
members.push_back(
|
||||
std::make_unique<RaftMemberTest>(sys, member_id, test_config, network));
|
||||
members.push_back(std::make_unique<RaftMemberTest>(sys, member_id,
|
||||
test_config, network));
|
||||
network.Connect(member_id);
|
||||
}
|
||||
|
||||
@ -76,6 +75,4 @@ TEST(Raft, Reelection) {
|
||||
EXPECT_EQ(member->Leader(), second_leader);
|
||||
}
|
||||
}
|
||||
|
||||
sys.AwaitShutdown();
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
using namespace communication::reactor;
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
struct MessageInt : public Message {
|
||||
MessageInt() {} // cereal needs this
|
||||
@ -52,7 +53,7 @@ CEREAL_REGISTER_TYPE(RequestMessage);
|
||||
TEST(SimpleTests, StartAndStopServices) {
|
||||
DistributedSystem system;
|
||||
// do nothing
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
std::this_thread::sleep_for(500ms);
|
||||
system.StopServices();
|
||||
}
|
||||
|
||||
@ -65,13 +66,14 @@ TEST(SimpleTests, StartAndStopServices) {
|
||||
TEST(SimpleTests, SendEmptyMessage) {
|
||||
DistributedSystem system;
|
||||
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
|
||||
writer->Send<Message>();
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
system.Spawn("worker", [](Reactor &r) {
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
r.main_.first->OnEventOnce().ChainOnce<Message>(
|
||||
[&](const Message &, const Subscription &subscription) {
|
||||
// if this message isn't delivered, the main channel will never be
|
||||
@ -80,6 +82,7 @@ TEST(SimpleTests, SendEmptyMessage) {
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(400ms);
|
||||
system.StopServices();
|
||||
}
|
||||
|
||||
@ -93,7 +96,8 @@ TEST(SimpleTests, SendEmptyMessage) {
|
||||
TEST(SimpleTests, SendReturnAddressMessage) {
|
||||
DistributedSystem system;
|
||||
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
|
||||
writer->Send<ReturnAddressMessage>(r.name(), "main");
|
||||
r.main_.first->OnEvent<MessageInt>(
|
||||
@ -102,7 +106,7 @@ TEST(SimpleTests, SendReturnAddressMessage) {
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
system.Spawn("worker", [](Reactor &r) {
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
r.main_.first->OnEvent<ReturnAddressMessage>(
|
||||
[&](const ReturnAddressMessage &message, const Subscription &) {
|
||||
message.FindChannel(r.system_)->Send<MessageInt>(5);
|
||||
@ -110,6 +114,7 @@ TEST(SimpleTests, SendReturnAddressMessage) {
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(400ms);
|
||||
system.StopServices();
|
||||
}
|
||||
|
||||
@ -123,7 +128,8 @@ TEST(SimpleTests, SendReturnAddressMessage) {
|
||||
TEST(SimpleTests, SendSerializableMessage) {
|
||||
DistributedSystem system;
|
||||
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
auto writer = r.system_.FindChannel("127.0.0.1", 10000, "worker", "main");
|
||||
writer->Send<RequestMessage>(r.name(), "main", 123);
|
||||
r.main_.first->OnEvent<MessageInt>(
|
||||
@ -133,7 +139,7 @@ TEST(SimpleTests, SendSerializableMessage) {
|
||||
});
|
||||
});
|
||||
|
||||
system.Spawn("worker", [](Reactor &r) {
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
r.main_.first->OnEvent<RequestMessage>(
|
||||
[&](const RequestMessage &message, const Subscription &) {
|
||||
ASSERT_EQ(message.x, 123);
|
||||
@ -142,6 +148,7 @@ TEST(SimpleTests, SendSerializableMessage) {
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(400ms);
|
||||
system.StopServices();
|
||||
}
|
||||
|
||||
|
@ -11,47 +11,26 @@
|
||||
#include "gtest/gtest.h"
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
using namespace communication::reactor;
|
||||
using Subscription = EventStream::Subscription;
|
||||
|
||||
TEST(SystemTest, ReturnWithoutThrowing) {
|
||||
System system;
|
||||
ASSERT_NO_THROW(
|
||||
system.Spawn("master", [](Reactor &r) { r.CloseChannel("main"); }));
|
||||
ASSERT_NO_THROW(system.AwaitShutdown());
|
||||
auto master =
|
||||
system.Spawn("master", [](Reactor &r) { r.CloseChannel("main"); });
|
||||
std::this_thread::sleep_for(100ms);
|
||||
}
|
||||
|
||||
TEST(ChannelCreationTest, ThrowOnReusingChannelName) {
|
||||
System system;
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
r.Open("channel");
|
||||
ASSERT_THROW(r.Open("channel"), utils::BasicException);
|
||||
r.CloseChannel("main");
|
||||
r.CloseChannel("channel");
|
||||
});
|
||||
system.AwaitShutdown();
|
||||
}
|
||||
|
||||
TEST(ChannelSetUpTest, CheckMainChannelIsSet) {
|
||||
System system;
|
||||
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
std::shared_ptr<ChannelWriter> channel_writer;
|
||||
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
system.Spawn("worker", [](Reactor &r) {
|
||||
std::shared_ptr<ChannelWriter> channel_writer;
|
||||
while (!(channel_writer = r.system_.FindChannel("master", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
system.AwaitShutdown();
|
||||
std::this_thread::sleep_for(100ms);
|
||||
}
|
||||
|
||||
TEST(SimpleSendTest, OneCallback) {
|
||||
@ -61,15 +40,14 @@ TEST(SimpleSendTest, OneCallback) {
|
||||
};
|
||||
|
||||
System system;
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
std::shared_ptr<ChannelWriter> channel_writer;
|
||||
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
auto channel_writer = r.system_.FindChannel("worker", "main");
|
||||
channel_writer->Send<MessageInt>(888);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
system.Spawn("worker", [](Reactor &r) {
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
stream->OnEvent<MessageInt>(
|
||||
@ -78,8 +56,7 @@ TEST(SimpleSendTest, OneCallback) {
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
|
||||
system.AwaitShutdown();
|
||||
std::this_thread::sleep_for(200ms);
|
||||
}
|
||||
|
||||
TEST(SimpleSendTest, IgnoreAfterClose) {
|
||||
@ -90,10 +67,10 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
|
||||
|
||||
System system;
|
||||
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
std::shared_ptr<ChannelWriter> channel_writer;
|
||||
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
std::shared_ptr<ChannelWriter> channel_writer =
|
||||
r.system_.FindChannel("worker", "main");
|
||||
channel_writer->Send<MessageInt>(101);
|
||||
channel_writer->Send<MessageInt>(102); // should be ignored
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
@ -103,7 +80,7 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
system.Spawn("worker", [](Reactor &r) {
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&r](const MessageInt &msg, const Subscription &) {
|
||||
@ -112,7 +89,53 @@ TEST(SimpleSendTest, IgnoreAfterClose) {
|
||||
});
|
||||
});
|
||||
|
||||
system.AwaitShutdown();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
}
|
||||
|
||||
TEST(SimpleSendTest, RecreateChannelAfterClosing) {
|
||||
struct MessageInt : public Message {
|
||||
MessageInt(int xx) : x(xx) {}
|
||||
int x;
|
||||
};
|
||||
|
||||
System system;
|
||||
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
auto channel_writer = r.system_.FindChannel("worker", "main");
|
||||
// Original "worker" reactor will die after it process this message.
|
||||
channel_writer->Send<MessageInt>(101);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
// This message will be dropped since there is no reactor with name
|
||||
// "worker".
|
||||
channel_writer->Send<MessageInt>(102);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||
// This message should recieved by new "worker" reactor.
|
||||
channel_writer->Send<MessageInt>(103);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&r](const MessageInt &msg, const Subscription &) {
|
||||
r.CloseChannel("main");
|
||||
ASSERT_EQ(msg.x, 101);
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
|
||||
auto worker2 = system.Spawn("worker", [](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&r](const MessageInt &msg, const Subscription &) {
|
||||
r.CloseChannel("main");
|
||||
ASSERT_EQ(msg.x, 103);
|
||||
});
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
}
|
||||
|
||||
TEST(SimpleSendTest, DuringFirstEvent) {
|
||||
@ -125,7 +148,7 @@ TEST(SimpleSendTest, DuringFirstEvent) {
|
||||
|
||||
std::promise<int> p;
|
||||
auto f = p.get_future();
|
||||
system.Spawn("master", [&p](Reactor &r) mutable {
|
||||
auto master = system.Spawn("master", [&p](Reactor &r) mutable {
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
stream->OnEvent<MessageInt>(
|
||||
@ -145,7 +168,6 @@ TEST(SimpleSendTest, DuringFirstEvent) {
|
||||
|
||||
f.wait();
|
||||
ASSERT_EQ(f.get(), 777);
|
||||
system.AwaitShutdown();
|
||||
}
|
||||
|
||||
TEST(MultipleSendTest, UnsubscribeService) {
|
||||
@ -160,10 +182,10 @@ TEST(MultipleSendTest, UnsubscribeService) {
|
||||
|
||||
System system;
|
||||
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
std::shared_ptr<ChannelWriter> channel_writer;
|
||||
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
std::shared_ptr<ChannelWriter> channel_writer =
|
||||
r.system_.FindChannel("worker", "main");
|
||||
channel_writer->Send<MessageInt>(55);
|
||||
channel_writer->Send<MessageInt>(66);
|
||||
channel_writer->Send<MessageInt>(77);
|
||||
@ -176,30 +198,31 @@ TEST(MultipleSendTest, UnsubscribeService) {
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
system.Spawn("worker", [num_received_messages = 0](Reactor & r) mutable {
|
||||
EventStream *stream = r.main_.first;
|
||||
auto worker =
|
||||
system.Spawn("worker", [num_received_messages = 0](Reactor & r) mutable {
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&](const MessageInt &msgint, const Subscription &subscription) {
|
||||
ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
|
||||
++num_received_messages;
|
||||
if (msgint.x == 66) {
|
||||
subscription.Unsubscribe(); // receive only two of them
|
||||
}
|
||||
});
|
||||
stream->OnEvent<MessageChar>(
|
||||
[&](const MessageChar &msgchar, const Subscription &subscription) {
|
||||
char c = msgchar.x;
|
||||
++num_received_messages;
|
||||
ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
|
||||
if (num_received_messages == 5) {
|
||||
subscription.Unsubscribe();
|
||||
r.CloseChannel("main");
|
||||
}
|
||||
});
|
||||
});
|
||||
stream->OnEvent<MessageInt>(
|
||||
[&](const MessageInt &msgint, const Subscription &subscription) {
|
||||
ASSERT_TRUE(msgint.x == 55 || msgint.x == 66);
|
||||
++num_received_messages;
|
||||
if (msgint.x == 66) {
|
||||
subscription.Unsubscribe(); // receive only two of them
|
||||
}
|
||||
});
|
||||
stream->OnEvent<MessageChar>(
|
||||
[&](const MessageChar &msgchar, const Subscription &subscription) {
|
||||
char c = msgchar.x;
|
||||
++num_received_messages;
|
||||
ASSERT_TRUE(c == 'a' || c == 'b' || c == 'c');
|
||||
if (num_received_messages == 5) {
|
||||
subscription.Unsubscribe();
|
||||
r.CloseChannel("main");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
system.AwaitShutdown();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
}
|
||||
|
||||
TEST(MultipleSendTest, OnEvent) {
|
||||
@ -213,10 +236,10 @@ TEST(MultipleSendTest, OnEvent) {
|
||||
};
|
||||
|
||||
System system;
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
std::shared_ptr<ChannelWriter> channel_writer;
|
||||
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
std::shared_ptr<ChannelWriter> channel_writer =
|
||||
r.system_.FindChannel("worker", "main");
|
||||
|
||||
channel_writer->Send<MessageInt>(101);
|
||||
channel_writer->Send<MessageChar>('a');
|
||||
@ -225,7 +248,7 @@ TEST(MultipleSendTest, OnEvent) {
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
system.Spawn("worker", [correct_vals = 0](Reactor & r) mutable {
|
||||
auto worker = system.Spawn("worker", [correct_vals = 0](Reactor & r) mutable {
|
||||
struct EndMessage : Message {};
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
@ -251,7 +274,7 @@ TEST(MultipleSendTest, OnEvent) {
|
||||
});
|
||||
});
|
||||
|
||||
system.AwaitShutdown();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
}
|
||||
|
||||
TEST(MultipleSendTest, Chaining) {
|
||||
@ -262,17 +285,17 @@ TEST(MultipleSendTest, Chaining) {
|
||||
|
||||
System system;
|
||||
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
std::shared_ptr<ChannelWriter> channel_writer;
|
||||
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
std::shared_ptr<ChannelWriter> channel_writer =
|
||||
r.system_.FindChannel("worker", "main");
|
||||
channel_writer->Send<MessageInt>(55);
|
||||
channel_writer->Send<MessageInt>(66);
|
||||
channel_writer->Send<MessageInt>(77);
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
system.Spawn("worker", [](Reactor &r) {
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
EventStream *stream = r.main_.first;
|
||||
|
||||
stream->OnEventOnce()
|
||||
@ -288,8 +311,7 @@ TEST(MultipleSendTest, Chaining) {
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
});
|
||||
|
||||
system.AwaitShutdown();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
}
|
||||
|
||||
TEST(MultipleSendTest, ChainingInRightOrder) {
|
||||
@ -305,10 +327,10 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
|
||||
|
||||
System system;
|
||||
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
std::shared_ptr<ChannelWriter> channel_writer;
|
||||
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
std::shared_ptr<ChannelWriter> channel_writer =
|
||||
r.system_.FindChannel("worker", "main");
|
||||
channel_writer->Send<MessageChar>('a');
|
||||
channel_writer->Send<MessageInt>(55);
|
||||
channel_writer->Send<MessageChar>('b');
|
||||
@ -316,7 +338,8 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
system.Spawn("worker", [](Reactor &r) {
|
||||
auto worker = system.Spawn("worker", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
EventStream *stream = r.main_.first;
|
||||
stream->OnEventOnce()
|
||||
.ChainOnce<MessageInt>([](const MessageInt &msg, const Subscription &) {
|
||||
@ -333,7 +356,7 @@ TEST(MultipleSendTest, ChainingInRightOrder) {
|
||||
});
|
||||
});
|
||||
|
||||
system.AwaitShutdown();
|
||||
std::this_thread::sleep_for(300ms);
|
||||
}
|
||||
|
||||
TEST(MultipleSendTest, ProcessManyMessages) {
|
||||
@ -346,12 +369,11 @@ TEST(MultipleSendTest, ProcessManyMessages) {
|
||||
|
||||
System system;
|
||||
|
||||
system.Spawn("master", [](Reactor &r) {
|
||||
std::shared_ptr<ChannelWriter> channel_writer;
|
||||
while (!(channel_writer = r.system_.FindChannel("worker", "main")))
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(300));
|
||||
auto master = system.Spawn("master", [](Reactor &r) {
|
||||
std::this_thread::sleep_for(100ms);
|
||||
std::shared_ptr<ChannelWriter> channel_writer =
|
||||
r.system_.FindChannel("worker", "main");
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 100));
|
||||
for (int i = 0; i < kNumTests; ++i) {
|
||||
channel_writer->Send<MessageInt>(rand());
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 5));
|
||||
@ -359,7 +381,7 @@ TEST(MultipleSendTest, ProcessManyMessages) {
|
||||
r.CloseChannel("main");
|
||||
});
|
||||
|
||||
system.Spawn("worker", [vals = 0](Reactor & r) mutable {
|
||||
auto worker = system.Spawn("worker", [vals = 0](Reactor & r) mutable {
|
||||
struct EndMessage : Message {};
|
||||
EventStream *stream = r.main_.first;
|
||||
vals = 0;
|
||||
@ -376,7 +398,8 @@ TEST(MultipleSendTest, ProcessManyMessages) {
|
||||
}
|
||||
});
|
||||
});
|
||||
system.AwaitShutdown();
|
||||
|
||||
std::this_thread::sleep_for(1000ms);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
|
Loading…
Reference in New Issue
Block a user