Compare commits

...

2 Commits

Author SHA1 Message Date
Kostas
ebe54d759e Fix websocket headers 2022-01-24 12:07:35 +02:00
Antonio Andelic
492a13d5e7 Bolt over websocket initial 2022-01-03 16:39:44 +01:00
6 changed files with 328 additions and 5 deletions

View File

@ -6,8 +6,10 @@ set(communication_src_files
helpers.cpp
init.cpp)
find_package(Boost REQUIRED)
add_library(mg-communication STATIC ${communication_src_files})
target_link_libraries(mg-communication Threads::Threads mg-utils mg-io fmt::fmt gflags)
target_link_libraries(mg-communication Boost::headers Threads::Threads mg-utils mg-io fmt::fmt gflags)
find_package(OpenSSL REQUIRED)
target_link_libraries(mg-communication ${OPENSSL_LIBRARIES})

View File

@ -0,0 +1,94 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <list>
#include <memory>
#include <spdlog/spdlog.h>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core.hpp>
#include "communication/websocket/session.hpp"
#include "utils/spin_lock.hpp"
#include "utils/synchronized.hpp"
namespace communication::websocket {
template <typename TSession, typename TSessionData>
class Listener : public std::enable_shared_from_this<Listener<TSession, TSessionData>> {
using tcp = boost::asio::ip::tcp;
public:
template <typename... Args>
static std::shared_ptr<Listener> Create(Args &&...args) {
return std::shared_ptr<Listener>{new Listener(std::forward<Args>(args)...)};
}
// Start accepting incoming connections
void Run() {
// The new connection gets its own strand
acceptor_.async_accept(ioc_, [shared_this = this->shared_from_this()](auto ec, auto socket) {
shared_this->OnAccept(ec, std::move(socket));
});
}
private:
Listener(boost::asio::io_context &ioc, tcp::endpoint endpoint, TSessionData *data)
: data_{data}, ioc_(ioc), acceptor_(ioc) {
boost::beast::error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if (ec) {
LogError(ec, "open");
return;
}
// Allow address reuse
acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
if (ec) {
LogError(ec, "set_option");
return;
}
// Bind to the server address
acceptor_.bind(endpoint, ec);
if (ec) {
LogError(ec, "bind");
return;
}
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec) {
LogError(ec, "listen");
return;
}
spdlog::info("WebSocket server is listening on {}:{}", endpoint.address(), endpoint.port());
}
void OnAccept(boost::beast::error_code ec, tcp::socket socket) {
if (ec) {
return LogError(ec, "accept");
}
Session<TSession, TSessionData>::Create(std::move(socket), data_)->Run();
Run();
}
TSessionData *data_;
boost::asio::io_context &ioc_;
tcp::acceptor acceptor_;
};
} // namespace communication::websocket

View File

@ -0,0 +1,69 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#define BOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT
#include <thread>
#include <spdlog/sinks/base_sink.h>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include "communication/websocket/listener.hpp"
#include "io/network/endpoint.hpp"
namespace communication::websocket {
template <typename TSession, typename TSessionData>
class Server final {
using tcp = boost::asio::ip::tcp;
public:
explicit Server(io::network::Endpoint endpoint, TSessionData *data)
: ioc_{},
listener_{Listener<TSession, TSessionData>::Create(
ioc_, tcp::endpoint{boost::asio::ip::make_address(endpoint.address), endpoint.port}, data)} {}
Server(const Server &) = delete;
Server(Server &&) = delete;
Server &operator=(const Server &) = delete;
Server &operator=(Server &&) = delete;
~Server() {
MG_ASSERT(!background_thread_ || (ioc_.stopped() && !background_thread_->joinable()),
"Server wasn't shutdown properly");
}
void Start() {
MG_ASSERT(!background_thread_, "The server was already started!");
listener_->Run();
background_thread_.emplace([this] { ioc_.run(); });
}
void Shutdown() { ioc_.stop(); }
void AwaitShutdown() {
if (background_thread_ && background_thread_->joinable()) {
background_thread_->join();
}
}
bool IsRunning() const { return background_thread_ && !ioc_.stopped(); }
private:
boost::asio::io_context ioc_;
std::shared_ptr<Listener<TSession, TSessionData>> listener_;
std::optional<std::thread> background_thread_;
};
} // namespace communication::websocket

View File

@ -0,0 +1,157 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <deque>
#include <memory>
#include <span>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/websocket.hpp>
#include "communication/buffer.hpp"
#include "communication/session.hpp"
namespace communication::websocket {
void LogError(boost::beast::error_code ec, const std::string_view what) {
spdlog::warn("Websocket session failed on {}: {}", what, ec.message());
}
template <typename TSession, typename TSessionData>
class Session : public std::enable_shared_from_this<Session<TSession, TSessionData>> {
using tcp = boost::asio::ip::tcp;
public:
template <typename... Args>
static std::shared_ptr<Session> Create(Args &&...args) {
return std::shared_ptr<Session>{new Session{std::forward<Args>(args)...}};
}
void Run() {
boost::asio::dispatch(strand_, [shared_this = this->shared_from_this()] { shared_this->OnRun(); });
}
private:
explicit Session(tcp::socket &&socket, TSessionData *data)
: endpoint_(socket.local_endpoint()),
ws_(std::move(socket)),
strand_{boost::asio::make_strand(ws_.get_executor())},
data_{data} {}
void OnRun() {
ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested(boost::beast::role_type::server));
boost::asio::socket_base::keep_alive option(true);
ws_.set_option(boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::response_type &res) {
res.set(boost::beast::http::field::server, "Memgraph WS");
res.set(boost::beast::http::field::sec_websocket_protocol, "binary");
}));
ws_.binary(true);
// This buffer will hold the HTTP request as raw characters
// This buffer is required for reading HTTP messages
// flat_buffer buffer;
// Read the HTTP request ourselves
// boost::beast::http::request<http::string_body> req;
// http::read(sock, buffer, req);
// std::cout << "bu
// Read into our buffer until we reach the end of the HTTP request.
// No parsing takes place here, we are just accumulating data.
// boost::beast::net::read_until(sock, net::dynamic_buffer(s), "\r\n\r\n");
// Now accept the connection, using the buffered data.
// ws_.accept(net::buffer(s));
ws_.async_accept(boost::asio::bind_executor(
strand_, [shared_this = this->shared_from_this()](auto ec) { shared_this->OnAccept(ec); }));
}
void OnAccept(boost::beast::error_code ec) {
if (ec) {
return LogError(ec, "accept");
}
session_data_.emplace(this->shared_from_this());
// run on the strand
boost::asio::dispatch(strand_, [shared_this = this->shared_from_this()] { shared_this->DoRead(); });
}
void DoWrite(const uint8_t *data, size_t len, bool have_more = false) {
boost::beast::error_code ec;
ws_.write(boost::asio::const_buffer{data, len}, ec);
if (ec) {
return LogError(ec, "write");
}
}
void DoRead() {
auto buf = session_data_->input_buffer_.write_end()->Allocate();
auto mutable_buffer = std::make_unique<boost::asio::mutable_buffer>(buf.data, buf.len);
ws_.async_read_some(
*mutable_buffer,
boost::asio::bind_executor(
strand_, [mutable_buffer = std::move(mutable_buffer), shared_this = this->shared_from_this()](
auto ec, auto bytes_transferred) { shared_this->OnRead(ec, bytes_transferred); }));
}
void OnRead(boost::beast::error_code ec, size_t bytes_transferred) {
if (ec == boost::beast::websocket::error::closed) {
return;
}
if (ec) {
return LogError(ec, "read");
}
session_data_->input_buffer_.write_end()->Written(bytes_transferred);
try {
session_data_->session_.Execute();
} catch (const SessionClosedException &e) {
spdlog::info("Closed connection");
return;
}
DoRead();
}
boost::asio::ip::tcp::endpoint endpoint_;
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::flat_buffer buffer_;
boost::asio::strand<decltype(ws_)::executor_type> strand_;
TSessionData *data_;
struct SessionData {
explicit SessionData(std::shared_ptr<Session<TSession, TSessionData>> session)
: output_stream_([session](const uint8_t *data, size_t len, bool have_more) {
session->DoWrite(data, len, have_more);
return true;
}),
session_(session->data_,
io::network::Endpoint{session->endpoint_.address().to_string(), session->endpoint_.port()},
input_buffer_.read_end(), &output_stream_) {}
Buffer input_buffer_;
communication::OutputStream output_stream_;
TSession session_;
};
std::optional<SessionData> session_data_;
};
} // namespace communication::websocket

View File

@ -32,6 +32,7 @@
#include <spdlog/sinks/stdout_color_sinks.h>
#include "communication/bolt/v1/constants.hpp"
#include "communication/websocket/server.hpp"
#include "helpers.hpp"
#include "py/py.hpp"
#include "query/auth_checker.hpp"
@ -1177,8 +1178,8 @@ int main(int argc, char **argv) {
spdlog::warn(utils::MessageWithLink("Using non-secure Bolt connection (without SSL).", "https://memgr.ph/ssl"));
}
ServerT server({FLAGS_bolt_address, static_cast<uint16_t>(FLAGS_bolt_port)}, &session_data, &context,
FLAGS_bolt_session_inactivity_timeout, service_name, FLAGS_bolt_num_workers);
auto server = communication::websocket::Server<BoltSession, SessionData>{
{FLAGS_bolt_address, static_cast<uint16_t>(FLAGS_bolt_port)}, &session_data};
// Setup telemetry
std::optional<telemetry::Telemetry> telemetry;
@ -1212,7 +1213,7 @@ int main(int argc, char **argv) {
};
InitSignalHandlers(shutdown);
MG_ASSERT(server.Start(), "Couldn't start the Bolt server!");
server.Start();
server.AwaitShutdown();
query::procedure::gModuleRegistry.UnloadAllModules();

View File

@ -9,7 +9,7 @@
<p>Check console for Cypher query outputs...</p>
<script>
const driver = neo4j.driver(
"bolt://localhost:9999",
"bolt://localhost:7687",
neo4j.auth.basic("", ""),
);