diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d3fb67f0a..831e2edad 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,6 +10,7 @@ add_subdirectory(telemetry) add_subdirectory(communication) add_subdirectory(auth) add_subdirectory(slk) +add_subdirectory(rpc) add_subdirectory(storage/v2) add_subdirectory(query) @@ -93,7 +94,7 @@ add_subdirectory(query) # #set(MG_SINGLE_NODE_HA_LIBS stdc++fs Threads::Threads fmt cppitertools # antlr_opencypher_parser_lib dl glog gflags -# mg-utils mg-io mg-requests mg-communication mg-comm-rpc +# mg-utils mg-io mg-requests mg-communication mg-rpc # mg-auth) # #if (USE_LTALLOC) diff --git a/src/communication/CMakeLists.txt b/src/communication/CMakeLists.txt index f06a8d320..b537cf6db 100644 --- a/src/communication/CMakeLists.txt +++ b/src/communication/CMakeLists.txt @@ -10,12 +10,3 @@ add_library(mg-communication STATIC ${communication_src_files}) target_link_libraries(mg-communication Threads::Threads mg-utils mg-io fmt glog gflags) target_link_libraries(mg-communication ${OPENSSL_LIBRARIES}) target_include_directories(mg-communication SYSTEM PUBLIC ${OPENSSL_INCLUDE_DIR}) - -set(communication_rpc_src_files - rpc/client.cpp - rpc/protocol.cpp - rpc/server.cpp) - -add_library(mg-comm-rpc STATIC ${communication_rpc_src_files}) -target_link_libraries(mg-comm-rpc Threads::Threads mg-communication mg-utils mg-io fmt glog gflags) -target_link_libraries(mg-comm-rpc mg-slk) diff --git a/src/lisp/types.lisp b/src/lisp/types.lisp index f53637bf1..e5fbd0a09 100644 --- a/src/lisp/types.lisp +++ b/src/lisp/types.lisp @@ -1502,7 +1502,7 @@ DEFINE-RPC introduces the following additional member options: (res-name (format nil "~ARes" name)) (rpc-decl #>cpp - using ${rpc-name} = communication::rpc::RequestResponse<${req-name}, ${res-name}>; + using ${rpc-name} = rpc::RequestResponse<${req-name}, ${res-name}>; cpp<#) (request-body (cdr (assoc :request options))) (response-body (cdr (assoc :response options))) diff --git a/src/raft/coordination.hpp b/src/raft/coordination.hpp index 4960b3721..ef36605f8 100644 --- a/src/raft/coordination.hpp +++ b/src/raft/coordination.hpp @@ -13,10 +13,10 @@ #include -#include "communication/rpc/client.hpp" -#include "communication/rpc/server.hpp" #include "io/network/endpoint.hpp" #include "raft/exceptions.hpp" +#include "rpc/client.hpp" +#include "rpc/server.hpp" namespace raft { @@ -86,8 +86,8 @@ class Coordination final { if (!client) { const auto &endpoint = endpoints_[other_id - 1]; - client = std::make_unique( - endpoint, &client_context_.value()); + client = + std::make_unique(endpoint, &client_context_.value()); } try { @@ -129,11 +129,11 @@ class Coordination final { uint16_t cluster_size_; std::optional server_context_; - std::optional server_; + std::optional server_; std::optional client_context_; std::vector endpoints_; - std::vector> clients_; + std::vector> clients_; std::vector> client_locks_; std::atomic alive_{true}; diff --git a/src/raft/raft_rpc_messages.lcp b/src/raft/raft_rpc_messages.lcp index 35c3fffe8..dd64f645b 100644 --- a/src/raft/raft_rpc_messages.lcp +++ b/src/raft/raft_rpc_messages.lcp @@ -4,8 +4,8 @@ #include #include -#include "communication/rpc/messages.hpp" #include "raft/log_entry.hpp" +#include "rpc/messages.hpp" cpp<# (lcp:namespace raft) diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 80134d320..285c09937 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -10,10 +10,10 @@ #include #include -#include "communication/rpc/client.hpp" #include "database/graph_db_accessor.hpp" #include "durability/single_node_ha/paths.hpp" #include "raft/exceptions.hpp" +#include "rpc/client.hpp" #include "slk/streams.hpp" #include "utils/cast.hpp" #include "utils/exceptions.hpp" @@ -719,7 +719,7 @@ void RaftServer::HBThreadMain(uint16_t peer_id) { // The heartbeat thread uses a dedicated RPC client for its peer so that it // can issue heartbeats in parallel with other RPC requests that are being // issued to the peer (replication, voting, etc.) - std::unique_ptr rpc_client; + std::unique_ptr rpc_client; while (!exiting_) { TimePoint wait_until; @@ -736,7 +736,7 @@ void RaftServer::HBThreadMain(uint16_t peer_id) { lock.unlock(); if (!rpc_client) { - rpc_client = std::make_unique( + rpc_client = std::make_unique( coordination_->GetOtherNodeEndpoint(peer_id), coordination_->GetRpcClientContext()); } diff --git a/src/raft/storage_info_rpc_messages.lcp b/src/raft/storage_info_rpc_messages.lcp index d2f0ef7dc..c9c7edf94 100644 --- a/src/raft/storage_info_rpc_messages.lcp +++ b/src/raft/storage_info_rpc_messages.lcp @@ -4,7 +4,7 @@ #include #include -#include "communication/rpc/messages.hpp" +#include "rpc/messages.hpp" #include "slk/serialization.hpp" cpp<# diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt new file mode 100644 index 000000000..41df5d635 --- /dev/null +++ b/src/rpc/CMakeLists.txt @@ -0,0 +1,8 @@ +set(rpc_src_files + client.cpp + protocol.cpp + server.cpp) + +add_library(mg-rpc STATIC ${rpc_src_files}) +target_link_libraries(mg-rpc Threads::Threads mg-communication mg-utils mg-io fmt glog gflags) +target_link_libraries(mg-rpc mg-slk) diff --git a/src/communication/rpc/client.cpp b/src/rpc/client.cpp similarity index 76% rename from src/communication/rpc/client.cpp rename to src/rpc/client.cpp index 60dec2914..a497c8254 100644 --- a/src/communication/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -1,6 +1,6 @@ -#include "communication/rpc/client.hpp" +#include "rpc/client.hpp" -namespace communication::rpc { +namespace rpc { Client::Client(const io::network::Endpoint &endpoint, communication::ClientContext *context) @@ -14,4 +14,4 @@ void Client::Abort() { client_ = std::nullopt; } -} // namespace communication::rpc +} // namespace rpc diff --git a/src/communication/rpc/client.hpp b/src/rpc/client.hpp similarity index 96% rename from src/communication/rpc/client.hpp rename to src/rpc/client.hpp index 2ac3e0845..452f8ddb0 100644 --- a/src/communication/rpc/client.hpp +++ b/src/rpc/client.hpp @@ -7,14 +7,14 @@ #include #include "communication/client.hpp" -#include "communication/rpc/exceptions.hpp" -#include "communication/rpc/messages.hpp" #include "io/network/endpoint.hpp" +#include "rpc/exceptions.hpp" +#include "rpc/messages.hpp" #include "slk/serialization.hpp" #include "slk/streams.hpp" #include "utils/on_scope_exit.hpp" -namespace communication::rpc { +namespace rpc { /// Client is thread safe, but it is recommended to use thread_local clients. class Client { @@ -128,4 +128,4 @@ class Client { std::mutex mutex_; }; -} // namespace communication::rpc +} // namespace rpc diff --git a/src/communication/rpc/client_pool.hpp b/src/rpc/client_pool.hpp similarity index 93% rename from src/communication/rpc/client_pool.hpp rename to src/rpc/client_pool.hpp index 6d447029d..d345d237b 100644 --- a/src/communication/rpc/client_pool.hpp +++ b/src/rpc/client_pool.hpp @@ -3,9 +3,9 @@ #include #include -#include "communication/rpc/client.hpp" +#include "rpc/client.hpp" -namespace communication::rpc { +namespace rpc { /** * A simple client pool that creates new RPC clients on demand. Useful when you @@ -62,4 +62,4 @@ class ClientPool { std::stack> unused_clients_; }; -} // namespace communication::rpc +} // namespace rpc diff --git a/src/communication/rpc/exceptions.hpp b/src/rpc/exceptions.hpp similarity index 93% rename from src/communication/rpc/exceptions.hpp rename to src/rpc/exceptions.hpp index 07452faca..c605e0ff5 100644 --- a/src/communication/rpc/exceptions.hpp +++ b/src/rpc/exceptions.hpp @@ -1,7 +1,7 @@ #include "io/network/endpoint.hpp" #include "utils/exceptions.hpp" -namespace communication::rpc { +namespace rpc { /// Exception that is thrown whenever a RPC call fails. /// This exception inherits `std::exception` directly because @@ -22,4 +22,4 @@ class RpcFailedException final : public utils::BasicException { private: io::network::Endpoint endpoint_; }; -} // namespace communication::rpc +} // namespace rpc diff --git a/src/communication/rpc/messages.hpp b/src/rpc/messages.hpp similarity index 91% rename from src/communication/rpc/messages.hpp rename to src/rpc/messages.hpp index 327101a81..9b9ec6feb 100644 --- a/src/communication/rpc/messages.hpp +++ b/src/rpc/messages.hpp @@ -5,7 +5,7 @@ #include "utils/typeinfo.hpp" -namespace communication::rpc { +namespace rpc { using MessageSize = uint32_t; @@ -23,4 +23,4 @@ struct RequestResponse { using Response = TResponse; }; -} // namespace communication::rpc +} // namespace rpc diff --git a/src/communication/rpc/protocol.cpp b/src/rpc/protocol.cpp similarity index 93% rename from src/communication/rpc/protocol.cpp rename to src/rpc/protocol.cpp index 7e5cadb19..71a831805 100644 --- a/src/communication/rpc/protocol.cpp +++ b/src/rpc/protocol.cpp @@ -1,12 +1,12 @@ -#include "communication/rpc/protocol.hpp" +#include "rpc/protocol.hpp" -#include "communication/rpc/messages.hpp" -#include "communication/rpc/server.hpp" +#include "rpc/messages.hpp" +#include "rpc/server.hpp" #include "slk/serialization.hpp" #include "slk/streams.hpp" #include "utils/on_scope_exit.hpp" -namespace communication::rpc { +namespace rpc { Session::Session(Server *server, const io::network::Endpoint &endpoint, communication::InputStream *input_stream, @@ -76,4 +76,4 @@ void Session::Execute() { : extended_it->second.res_type.name); } -} // namespace communication::rpc +} // namespace rpc diff --git a/src/communication/rpc/protocol.hpp b/src/rpc/protocol.hpp similarity index 92% rename from src/communication/rpc/protocol.hpp rename to src/rpc/protocol.hpp index d6f9f5e50..94210f207 100644 --- a/src/communication/rpc/protocol.hpp +++ b/src/rpc/protocol.hpp @@ -4,8 +4,8 @@ #include #include -#include "communication/rpc/messages.hpp" #include "communication/session.hpp" +#include "rpc/messages.hpp" /** * @brief Protocol @@ -16,7 +16,7 @@ * Message layout: MessageSize message_size, * message_size bytes serialized_message */ -namespace communication::rpc { +namespace rpc { // Forward declaration of class Server class Server; @@ -54,4 +54,4 @@ class Session { communication::OutputStream *output_stream_; }; -} // namespace communication::rpc +} // namespace rpc diff --git a/src/communication/rpc/server.cpp b/src/rpc/server.cpp similarity index 82% rename from src/communication/rpc/server.cpp rename to src/rpc/server.cpp index 96049960b..175f0ea98 100644 --- a/src/communication/rpc/server.cpp +++ b/src/rpc/server.cpp @@ -1,6 +1,6 @@ -#include "communication/rpc/server.hpp" +#include "rpc/server.hpp" -namespace communication::rpc { +namespace rpc { Server::Server(const io::network::Endpoint &endpoint, communication::ServerContext *context, size_t workers_count) @@ -16,4 +16,4 @@ void Server::AwaitShutdown() { server_.AwaitShutdown(); } const io::network::Endpoint &Server::endpoint() const { return server_.endpoint(); } -} // namespace communication::rpc +} // namespace rpc diff --git a/src/communication/rpc/server.hpp b/src/rpc/server.hpp similarity index 93% rename from src/communication/rpc/server.hpp rename to src/rpc/server.hpp index ede066400..6e5cb4b12 100644 --- a/src/communication/rpc/server.hpp +++ b/src/rpc/server.hpp @@ -4,13 +4,13 @@ #include #include -#include "communication/rpc/messages.hpp" -#include "communication/rpc/protocol.hpp" #include "communication/server.hpp" #include "io/network/endpoint.hpp" +#include "rpc/messages.hpp" +#include "rpc/protocol.hpp" #include "slk/streams.hpp" -namespace communication::rpc { +namespace rpc { class Server { public: @@ -90,6 +90,6 @@ class Server { std::map extended_callbacks_; communication::Server server_; -}; // namespace communication::rpc +}; -} // namespace communication::rpc +} // namespace rpc diff --git a/tests/benchmark/CMakeLists.txt b/tests/benchmark/CMakeLists.txt index 9f5c94751..c9b64e137 100644 --- a/tests/benchmark/CMakeLists.txt +++ b/tests/benchmark/CMakeLists.txt @@ -37,7 +37,7 @@ add_benchmark(query/stripped.cpp) target_link_libraries(${test_prefix}stripped mg-query) add_benchmark(rpc.cpp) -target_link_libraries(${test_prefix}rpc mg-comm-rpc) +target_link_libraries(${test_prefix}rpc mg-rpc) add_benchmark(skip_list_random.cpp) target_link_libraries(${test_prefix}skip_list_random mg-utils) diff --git a/tests/benchmark/rpc.cpp b/tests/benchmark/rpc.cpp index 459423e47..7b56b544a 100644 --- a/tests/benchmark/rpc.cpp +++ b/tests/benchmark/rpc.cpp @@ -3,10 +3,10 @@ #include -#include "communication/rpc/client.hpp" -#include "communication/rpc/client_pool.hpp" -#include "communication/rpc/messages.hpp" -#include "communication/rpc/server.hpp" +#include "rpc/client.hpp" +#include "rpc/client_pool.hpp" +#include "rpc/messages.hpp" +#include "rpc/server.hpp" #include "slk/serialization.hpp" #include "utils/timer.hpp" @@ -38,7 +38,7 @@ void EchoMessage::Save(const EchoMessage &obj, slk::Builder *builder) { const utils::TypeInfo EchoMessage::kType{2, "EchoMessage"}; -using Echo = communication::rpc::RequestResponse; +using Echo = rpc::RequestResponse; const int kThreadsNum = 16; @@ -51,10 +51,10 @@ DEFINE_bool(run_server, true, "Set to false to use external server"); DEFINE_bool(run_benchmark, true, "Set to false to only run server"); std::optional server_context; -std::optional server; +std::optional server; std::optional client_context; -std::optional clients[kThreadsNum]; -std::optional client_pool; +std::optional clients[kThreadsNum]; +std::optional client_pool; static void BenchmarkRpc(benchmark::State &state) { std::string data(state.range(0), 'a'); diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 1e29622af..5c81a7265 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -146,7 +146,7 @@ add_unit_test(network_timeouts.cpp) target_link_libraries(${test_prefix}network_timeouts mg-communication) add_unit_test(rpc.cpp) -target_link_libraries(${test_prefix}rpc mg-comm-rpc) +target_link_libraries(${test_prefix}rpc mg-rpc) # Test data structures diff --git a/tests/unit/rpc.cpp b/tests/unit/rpc.cpp index 1fe992c8b..05e3655d7 100644 --- a/tests/unit/rpc.cpp +++ b/tests/unit/rpc.cpp @@ -3,15 +3,15 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" -#include "communication/rpc/client.hpp" -#include "communication/rpc/client_pool.hpp" -#include "communication/rpc/messages.hpp" -#include "communication/rpc/server.hpp" +#include "rpc/client.hpp" +#include "rpc/client_pool.hpp" +#include "rpc/messages.hpp" +#include "rpc/server.hpp" #include "utils/timer.hpp" #include "rpc_messages.hpp" -using namespace communication::rpc; +using namespace rpc; using namespace std::literals::chrono_literals; namespace slk { @@ -97,8 +97,7 @@ TEST(Rpc, Abort) { }); utils::Timer timer; - EXPECT_THROW(client.Call(10, 20), - communication::rpc::RpcFailedException); + EXPECT_THROW(client.Call(10, 20), RpcFailedException); EXPECT_LT(timer.Elapsed(), 200ms); thread.join(); diff --git a/tests/unit/rpc_messages.hpp b/tests/unit/rpc_messages.hpp index d031ce501..0733c4835 100644 --- a/tests/unit/rpc_messages.hpp +++ b/tests/unit/rpc_messages.hpp @@ -1,6 +1,6 @@ #pragma once -#include "communication/rpc/messages.hpp" +#include "rpc/messages.hpp" #include "slk/serialization.hpp" #include "utils/typeinfo.hpp" @@ -41,7 +41,7 @@ void Save(const SumRes &res, Builder *builder); void Load(SumRes *res, Reader *reader); } // namespace slk -using Sum = communication::rpc::RequestResponse; +using Sum = rpc::RequestResponse; struct EchoMessage { static const utils::TypeInfo kType; @@ -62,4 +62,4 @@ void Save(const EchoMessage &echo, Builder *builder); void Load(EchoMessage *echo, Reader *reader); } // namespace slk -using Echo = communication::rpc::RequestResponse; +using Echo = rpc::RequestResponse;