Fix RPC network server segfault

Reviewers: teon.banek, buda

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1139
This commit is contained in:
Matej Ferencevic 2018-01-24 16:43:42 +01:00
parent fc20ddcd25
commit 60d7be5c19
4 changed files with 29 additions and 10 deletions

View File

@ -55,7 +55,13 @@ StreamBuffer Session::Allocate() { return buffer_.Allocate(); }
void Session::Written(size_t len) { buffer_.Written(len); }
void Session::Close() { DLOG(INFO) << "Closing session"; }
void Session::Close() {
DLOG(INFO) << "Closing session";
// We explicitly close the socket here to remove the socket from the epoll
// event loop. The response message send will fail but that is OK and
// intended because the remote side closed the connection.
socket_.get()->Close();
}
bool SendLength(Socket &socket, MessageSize length) {
return socket.Write(reinterpret_cast<uint8_t *>(&length),

View File

@ -73,8 +73,10 @@ class Server {
socket_.endpoint().port())
<< std::endl;
io::network::SocketEventDispatcher<ConnectionAcceptor> dispatcher;
ConnectionAcceptor acceptor(socket_, *this);
std::vector<std::unique_ptr<ConnectionAcceptor>> acceptors;
acceptors.emplace_back(std::make_unique<ConnectionAcceptor>(socket_, *this));
auto &acceptor = *acceptors.back().get();
io::network::SocketEventDispatcher<ConnectionAcceptor> dispatcher{acceptors};
dispatcher.AddListener(socket_.fd(), acceptor, EPOLLIN);
while (alive_) {
dispatcher.WaitAndProcessEvents();

View File

@ -188,7 +188,7 @@ class Worker {
SpinLock lock_;
TSessionData &session_data_;
io::network::SocketEventDispatcher<SessionSocketListener> dispatcher_;
std::vector<std::unique_ptr<SessionSocketListener>> session_listeners_;
io::network::SocketEventDispatcher<SessionSocketListener> dispatcher_{session_listeners_};
};
} // namespace communication

View File

@ -15,7 +15,9 @@ namespace io::network {
template <class Listener>
class SocketEventDispatcher {
public:
explicit SocketEventDispatcher(uint32_t flags = 0) : epoll_(flags) {}
explicit SocketEventDispatcher(
std::vector<std::unique_ptr<Listener>> &listeners, uint32_t flags = 0)
: epoll_(flags), listeners_(listeners) {}
void AddListener(int fd, Listener &listener, uint32_t events) {
// Add the listener associated to fd file descriptor to epoll.
@ -36,7 +38,15 @@ class SocketEventDispatcher {
// Go through all events and process them in order.
for (int i = 0; i < n; ++i) {
auto &event = events_[i];
Listener &listener = *reinterpret_cast<Listener *>(event.data.ptr);
Listener *listener = reinterpret_cast<Listener *>(event.data.ptr);
// Check if the listener that we got in the epoll event still exists
// because it might have been deleted in a previous call.
auto it =
std::find_if(listeners_.begin(), listeners_.end(),
[&](const auto &l) { return l.get() == listener; });
// If the listener doesn't exist anymore just ignore the event.
if (it == listeners_.end()) continue;
// Even though it is possible for multiple events to be reported we handle
// only one of them. Since we use epoll in level triggered mode
@ -49,23 +59,23 @@ class SocketEventDispatcher {
try {
if (event.events & EPOLLIN) {
// We have some data waiting to be read.
listener.OnData();
listener->OnData();
continue;
}
if (event.events & EPOLLRDHUP) {
listener.OnClose();
listener->OnClose();
continue;
}
// There was an error on the server side.
if (!(event.events & EPOLLIN) || event.events & (EPOLLHUP | EPOLLERR)) {
listener.OnError();
listener->OnError();
continue;
}
} catch (const std::exception &e) {
listener.OnException(e);
listener->OnException(e);
}
}
@ -79,6 +89,7 @@ class SocketEventDispatcher {
// socket).
Epoll epoll_;
Epoll::Event events_[kMaxEvents];
std::vector<std::unique_ptr<Listener>> &listeners_;
};
} // namespace io::network