Compare commits
2 Commits
master
...
T0785-MG-b
Author | SHA1 | Date | |
---|---|---|---|
|
ebe54d759e | ||
|
492a13d5e7 |
@ -6,8 +6,10 @@ set(communication_src_files
|
||||
helpers.cpp
|
||||
init.cpp)
|
||||
|
||||
find_package(Boost REQUIRED)
|
||||
|
||||
add_library(mg-communication STATIC ${communication_src_files})
|
||||
target_link_libraries(mg-communication Threads::Threads mg-utils mg-io fmt::fmt gflags)
|
||||
target_link_libraries(mg-communication Boost::headers Threads::Threads mg-utils mg-io fmt::fmt gflags)
|
||||
|
||||
find_package(OpenSSL REQUIRED)
|
||||
target_link_libraries(mg-communication ${OPENSSL_LIBRARIES})
|
||||
|
94
src/communication/websocket/listener.hpp
Normal file
94
src/communication/websocket/listener.hpp
Normal file
@ -0,0 +1,94 @@
|
||||
// Copyright 2021 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <list>
|
||||
#include <memory>
|
||||
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/beast/core.hpp>
|
||||
|
||||
#include "communication/websocket/session.hpp"
|
||||
#include "utils/spin_lock.hpp"
|
||||
#include "utils/synchronized.hpp"
|
||||
|
||||
namespace communication::websocket {
|
||||
template <typename TSession, typename TSessionData>
|
||||
class Listener : public std::enable_shared_from_this<Listener<TSession, TSessionData>> {
|
||||
using tcp = boost::asio::ip::tcp;
|
||||
|
||||
public:
|
||||
template <typename... Args>
|
||||
static std::shared_ptr<Listener> Create(Args &&...args) {
|
||||
return std::shared_ptr<Listener>{new Listener(std::forward<Args>(args)...)};
|
||||
}
|
||||
|
||||
// Start accepting incoming connections
|
||||
void Run() {
|
||||
// The new connection gets its own strand
|
||||
acceptor_.async_accept(ioc_, [shared_this = this->shared_from_this()](auto ec, auto socket) {
|
||||
shared_this->OnAccept(ec, std::move(socket));
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
Listener(boost::asio::io_context &ioc, tcp::endpoint endpoint, TSessionData *data)
|
||||
: data_{data}, ioc_(ioc), acceptor_(ioc) {
|
||||
boost::beast::error_code ec;
|
||||
|
||||
// Open the acceptor
|
||||
acceptor_.open(endpoint.protocol(), ec);
|
||||
if (ec) {
|
||||
LogError(ec, "open");
|
||||
return;
|
||||
}
|
||||
|
||||
// Allow address reuse
|
||||
acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
|
||||
if (ec) {
|
||||
LogError(ec, "set_option");
|
||||
return;
|
||||
}
|
||||
|
||||
// Bind to the server address
|
||||
acceptor_.bind(endpoint, ec);
|
||||
if (ec) {
|
||||
LogError(ec, "bind");
|
||||
return;
|
||||
}
|
||||
|
||||
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
|
||||
if (ec) {
|
||||
LogError(ec, "listen");
|
||||
return;
|
||||
}
|
||||
|
||||
spdlog::info("WebSocket server is listening on {}:{}", endpoint.address(), endpoint.port());
|
||||
}
|
||||
|
||||
void OnAccept(boost::beast::error_code ec, tcp::socket socket) {
|
||||
if (ec) {
|
||||
return LogError(ec, "accept");
|
||||
}
|
||||
|
||||
Session<TSession, TSessionData>::Create(std::move(socket), data_)->Run();
|
||||
Run();
|
||||
}
|
||||
|
||||
TSessionData *data_;
|
||||
boost::asio::io_context &ioc_;
|
||||
tcp::acceptor acceptor_;
|
||||
};
|
||||
} // namespace communication::websocket
|
69
src/communication/websocket/server.hpp
Normal file
69
src/communication/websocket/server.hpp
Normal file
@ -0,0 +1,69 @@
|
||||
// Copyright 2021 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#define BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT
|
||||
|
||||
#include <thread>
|
||||
|
||||
#include <spdlog/sinks/base_sink.h>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include "communication/websocket/listener.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
|
||||
namespace communication::websocket {
|
||||
|
||||
template <typename TSession, typename TSessionData>
|
||||
class Server final {
|
||||
using tcp = boost::asio::ip::tcp;
|
||||
|
||||
public:
|
||||
explicit Server(io::network::Endpoint endpoint, TSessionData *data)
|
||||
: ioc_{},
|
||||
listener_{Listener<TSession, TSessionData>::Create(
|
||||
ioc_, tcp::endpoint{boost::asio::ip::make_address(endpoint.address), endpoint.port}, data)} {}
|
||||
|
||||
Server(const Server &) = delete;
|
||||
Server(Server &&) = delete;
|
||||
Server &operator=(const Server &) = delete;
|
||||
Server &operator=(Server &&) = delete;
|
||||
|
||||
~Server() {
|
||||
MG_ASSERT(!background_thread_ || (ioc_.stopped() && !background_thread_->joinable()),
|
||||
"Server wasn't shutdown properly");
|
||||
}
|
||||
|
||||
void Start() {
|
||||
MG_ASSERT(!background_thread_, "The server was already started!");
|
||||
listener_->Run();
|
||||
background_thread_.emplace([this] { ioc_.run(); });
|
||||
}
|
||||
|
||||
void Shutdown() { ioc_.stop(); }
|
||||
|
||||
void AwaitShutdown() {
|
||||
if (background_thread_ && background_thread_->joinable()) {
|
||||
background_thread_->join();
|
||||
}
|
||||
}
|
||||
|
||||
bool IsRunning() const { return background_thread_ && !ioc_.stopped(); }
|
||||
|
||||
private:
|
||||
boost::asio::io_context ioc_;
|
||||
|
||||
std::shared_ptr<Listener<TSession, TSessionData>> listener_;
|
||||
std::optional<std::thread> background_thread_;
|
||||
};
|
||||
} // namespace communication::websocket
|
157
src/communication/websocket/session.hpp
Normal file
157
src/communication/websocket/session.hpp
Normal file
@ -0,0 +1,157 @@
|
||||
// Copyright 2021 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
#include <span>
|
||||
|
||||
#include <boost/asio/dispatch.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/beast/core/tcp_stream.hpp>
|
||||
#include <boost/beast/http.hpp>
|
||||
#include <boost/beast/websocket.hpp>
|
||||
|
||||
#include "communication/buffer.hpp"
|
||||
#include "communication/session.hpp"
|
||||
|
||||
namespace communication::websocket {
|
||||
void LogError(boost::beast::error_code ec, const std::string_view what) {
|
||||
spdlog::warn("Websocket session failed on {}: {}", what, ec.message());
|
||||
}
|
||||
|
||||
template <typename TSession, typename TSessionData>
|
||||
class Session : public std::enable_shared_from_this<Session<TSession, TSessionData>> {
|
||||
using tcp = boost::asio::ip::tcp;
|
||||
|
||||
public:
|
||||
template <typename... Args>
|
||||
static std::shared_ptr<Session> Create(Args &&...args) {
|
||||
return std::shared_ptr<Session>{new Session{std::forward<Args>(args)...}};
|
||||
}
|
||||
|
||||
void Run() {
|
||||
boost::asio::dispatch(strand_, [shared_this = this->shared_from_this()] { shared_this->OnRun(); });
|
||||
}
|
||||
|
||||
private:
|
||||
explicit Session(tcp::socket &&socket, TSessionData *data)
|
||||
: endpoint_(socket.local_endpoint()),
|
||||
ws_(std::move(socket)),
|
||||
strand_{boost::asio::make_strand(ws_.get_executor())},
|
||||
data_{data} {}
|
||||
|
||||
void OnRun() {
|
||||
ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested(boost::beast::role_type::server));
|
||||
boost::asio::socket_base::keep_alive option(true);
|
||||
ws_.set_option(boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::response_type &res) {
|
||||
res.set(boost::beast::http::field::server, "Memgraph WS");
|
||||
res.set(boost::beast::http::field::sec_websocket_protocol, "binary");
|
||||
}));
|
||||
|
||||
ws_.binary(true);
|
||||
// This buffer will hold the HTTP request as raw characters
|
||||
|
||||
// This buffer is required for reading HTTP messages
|
||||
// flat_buffer buffer;
|
||||
|
||||
// Read the HTTP request ourselves
|
||||
// boost::beast::http::request<http::string_body> req;
|
||||
// http::read(sock, buffer, req);
|
||||
// std::cout << "bu
|
||||
// Read into our buffer until we reach the end of the HTTP request.
|
||||
// No parsing takes place here, we are just accumulating data.
|
||||
|
||||
// boost::beast::net::read_until(sock, net::dynamic_buffer(s), "\r\n\r\n");
|
||||
|
||||
// Now accept the connection, using the buffered data.
|
||||
// ws_.accept(net::buffer(s));
|
||||
ws_.async_accept(boost::asio::bind_executor(
|
||||
strand_, [shared_this = this->shared_from_this()](auto ec) { shared_this->OnAccept(ec); }));
|
||||
}
|
||||
|
||||
void OnAccept(boost::beast::error_code ec) {
|
||||
if (ec) {
|
||||
return LogError(ec, "accept");
|
||||
}
|
||||
|
||||
session_data_.emplace(this->shared_from_this());
|
||||
|
||||
// run on the strand
|
||||
boost::asio::dispatch(strand_, [shared_this = this->shared_from_this()] { shared_this->DoRead(); });
|
||||
}
|
||||
|
||||
void DoWrite(const uint8_t *data, size_t len, bool have_more = false) {
|
||||
boost::beast::error_code ec;
|
||||
ws_.write(boost::asio::const_buffer{data, len}, ec);
|
||||
if (ec) {
|
||||
return LogError(ec, "write");
|
||||
}
|
||||
}
|
||||
|
||||
void DoRead() {
|
||||
auto buf = session_data_->input_buffer_.write_end()->Allocate();
|
||||
|
||||
auto mutable_buffer = std::make_unique<boost::asio::mutable_buffer>(buf.data, buf.len);
|
||||
|
||||
ws_.async_read_some(
|
||||
*mutable_buffer,
|
||||
boost::asio::bind_executor(
|
||||
strand_, [mutable_buffer = std::move(mutable_buffer), shared_this = this->shared_from_this()](
|
||||
auto ec, auto bytes_transferred) { shared_this->OnRead(ec, bytes_transferred); }));
|
||||
}
|
||||
|
||||
void OnRead(boost::beast::error_code ec, size_t bytes_transferred) {
|
||||
if (ec == boost::beast::websocket::error::closed) {
|
||||
return;
|
||||
}
|
||||
if (ec) {
|
||||
return LogError(ec, "read");
|
||||
}
|
||||
|
||||
session_data_->input_buffer_.write_end()->Written(bytes_transferred);
|
||||
|
||||
try {
|
||||
session_data_->session_.Execute();
|
||||
} catch (const SessionClosedException &e) {
|
||||
spdlog::info("Closed connection");
|
||||
return;
|
||||
}
|
||||
DoRead();
|
||||
}
|
||||
|
||||
boost::asio::ip::tcp::endpoint endpoint_;
|
||||
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
|
||||
boost::beast::flat_buffer buffer_;
|
||||
boost::asio::strand<decltype(ws_)::executor_type> strand_;
|
||||
|
||||
TSessionData *data_;
|
||||
|
||||
struct SessionData {
|
||||
explicit SessionData(std::shared_ptr<Session<TSession, TSessionData>> session)
|
||||
: output_stream_([session](const uint8_t *data, size_t len, bool have_more) {
|
||||
session->DoWrite(data, len, have_more);
|
||||
return true;
|
||||
}),
|
||||
session_(session->data_,
|
||||
io::network::Endpoint{session->endpoint_.address().to_string(), session->endpoint_.port()},
|
||||
input_buffer_.read_end(), &output_stream_) {}
|
||||
|
||||
Buffer input_buffer_;
|
||||
communication::OutputStream output_stream_;
|
||||
TSession session_;
|
||||
};
|
||||
|
||||
std::optional<SessionData> session_data_;
|
||||
};
|
||||
} // namespace communication::websocket
|
@ -32,6 +32,7 @@
|
||||
#include <spdlog/sinks/stdout_color_sinks.h>
|
||||
|
||||
#include "communication/bolt/v1/constants.hpp"
|
||||
#include "communication/websocket/server.hpp"
|
||||
#include "helpers.hpp"
|
||||
#include "py/py.hpp"
|
||||
#include "query/auth_checker.hpp"
|
||||
@ -1177,8 +1178,8 @@ int main(int argc, char **argv) {
|
||||
spdlog::warn(utils::MessageWithLink("Using non-secure Bolt connection (without SSL).", "https://memgr.ph/ssl"));
|
||||
}
|
||||
|
||||
ServerT server({FLAGS_bolt_address, static_cast<uint16_t>(FLAGS_bolt_port)}, &session_data, &context,
|
||||
FLAGS_bolt_session_inactivity_timeout, service_name, FLAGS_bolt_num_workers);
|
||||
auto server = communication::websocket::Server<BoltSession, SessionData>{
|
||||
{FLAGS_bolt_address, static_cast<uint16_t>(FLAGS_bolt_port)}, &session_data};
|
||||
|
||||
// Setup telemetry
|
||||
std::optional<telemetry::Telemetry> telemetry;
|
||||
@ -1212,7 +1213,7 @@ int main(int argc, char **argv) {
|
||||
};
|
||||
InitSignalHandlers(shutdown);
|
||||
|
||||
MG_ASSERT(server.Start(), "Couldn't start the Bolt server!");
|
||||
server.Start();
|
||||
server.AwaitShutdown();
|
||||
query::procedure::gModuleRegistry.UnloadAllModules();
|
||||
|
||||
|
@ -9,7 +9,7 @@
|
||||
<p>Check console for Cypher query outputs...</p>
|
||||
<script>
|
||||
const driver = neo4j.driver(
|
||||
"bolt://localhost:9999",
|
||||
"bolt://localhost:7687",
|
||||
neo4j.auth.basic("", ""),
|
||||
);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user