Move RPC to root source directory

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2641
This commit is contained in:
Matej Ferencevic 2020-01-24 15:30:47 +01:00
parent 7d0590a753
commit de48548164
22 changed files with 71 additions and 72 deletions

View File

@ -10,6 +10,7 @@ add_subdirectory(telemetry)
add_subdirectory(communication) add_subdirectory(communication)
add_subdirectory(auth) add_subdirectory(auth)
add_subdirectory(slk) add_subdirectory(slk)
add_subdirectory(rpc)
add_subdirectory(storage/v2) add_subdirectory(storage/v2)
add_subdirectory(query) add_subdirectory(query)
@ -93,7 +94,7 @@ add_subdirectory(query)
# #
#set(MG_SINGLE_NODE_HA_LIBS stdc++fs Threads::Threads fmt cppitertools #set(MG_SINGLE_NODE_HA_LIBS stdc++fs Threads::Threads fmt cppitertools
# antlr_opencypher_parser_lib dl glog gflags # antlr_opencypher_parser_lib dl glog gflags
# mg-utils mg-io mg-requests mg-communication mg-comm-rpc # mg-utils mg-io mg-requests mg-communication mg-rpc
# mg-auth) # mg-auth)
# #
#if (USE_LTALLOC) #if (USE_LTALLOC)

View File

@ -10,12 +10,3 @@ add_library(mg-communication STATIC ${communication_src_files})
target_link_libraries(mg-communication Threads::Threads mg-utils mg-io fmt glog gflags) target_link_libraries(mg-communication Threads::Threads mg-utils mg-io fmt glog gflags)
target_link_libraries(mg-communication ${OPENSSL_LIBRARIES}) target_link_libraries(mg-communication ${OPENSSL_LIBRARIES})
target_include_directories(mg-communication SYSTEM PUBLIC ${OPENSSL_INCLUDE_DIR}) target_include_directories(mg-communication SYSTEM PUBLIC ${OPENSSL_INCLUDE_DIR})
set(communication_rpc_src_files
rpc/client.cpp
rpc/protocol.cpp
rpc/server.cpp)
add_library(mg-comm-rpc STATIC ${communication_rpc_src_files})
target_link_libraries(mg-comm-rpc Threads::Threads mg-communication mg-utils mg-io fmt glog gflags)
target_link_libraries(mg-comm-rpc mg-slk)

View File

@ -1502,7 +1502,7 @@ DEFINE-RPC introduces the following additional member options:
(res-name (format nil "~ARes" name)) (res-name (format nil "~ARes" name))
(rpc-decl (rpc-decl
#>cpp #>cpp
using ${rpc-name} = communication::rpc::RequestResponse<${req-name}, ${res-name}>; using ${rpc-name} = rpc::RequestResponse<${req-name}, ${res-name}>;
cpp<#) cpp<#)
(request-body (cdr (assoc :request options))) (request-body (cdr (assoc :request options)))
(response-body (cdr (assoc :response options))) (response-body (cdr (assoc :response options)))

View File

@ -13,10 +13,10 @@
#include <glog/logging.h> #include <glog/logging.h>
#include "communication/rpc/client.hpp"
#include "communication/rpc/server.hpp"
#include "io/network/endpoint.hpp" #include "io/network/endpoint.hpp"
#include "raft/exceptions.hpp" #include "raft/exceptions.hpp"
#include "rpc/client.hpp"
#include "rpc/server.hpp"
namespace raft { namespace raft {
@ -86,8 +86,8 @@ class Coordination final {
if (!client) { if (!client) {
const auto &endpoint = endpoints_[other_id - 1]; const auto &endpoint = endpoints_[other_id - 1];
client = std::make_unique<communication::rpc::Client>( client =
endpoint, &client_context_.value()); std::make_unique<rpc::Client>(endpoint, &client_context_.value());
} }
try { try {
@ -129,11 +129,11 @@ class Coordination final {
uint16_t cluster_size_; uint16_t cluster_size_;
std::optional<communication::ServerContext> server_context_; std::optional<communication::ServerContext> server_context_;
std::optional<communication::rpc::Server> server_; std::optional<rpc::Server> server_;
std::optional<communication::ClientContext> client_context_; std::optional<communication::ClientContext> client_context_;
std::vector<io::network::Endpoint> endpoints_; std::vector<io::network::Endpoint> endpoints_;
std::vector<std::unique_ptr<communication::rpc::Client>> clients_; std::vector<std::unique_ptr<rpc::Client>> clients_;
std::vector<std::unique_ptr<std::mutex>> client_locks_; std::vector<std::unique_ptr<std::mutex>> client_locks_;
std::atomic<bool> alive_{true}; std::atomic<bool> alive_{true};

View File

@ -4,8 +4,8 @@
#include <cstring> #include <cstring>
#include <vector> #include <vector>
#include "communication/rpc/messages.hpp"
#include "raft/log_entry.hpp" #include "raft/log_entry.hpp"
#include "rpc/messages.hpp"
cpp<# cpp<#
(lcp:namespace raft) (lcp:namespace raft)

View File

@ -10,10 +10,10 @@
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <glog/logging.h> #include <glog/logging.h>
#include "communication/rpc/client.hpp"
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "durability/single_node_ha/paths.hpp" #include "durability/single_node_ha/paths.hpp"
#include "raft/exceptions.hpp" #include "raft/exceptions.hpp"
#include "rpc/client.hpp"
#include "slk/streams.hpp" #include "slk/streams.hpp"
#include "utils/cast.hpp" #include "utils/cast.hpp"
#include "utils/exceptions.hpp" #include "utils/exceptions.hpp"
@ -719,7 +719,7 @@ void RaftServer::HBThreadMain(uint16_t peer_id) {
// The heartbeat thread uses a dedicated RPC client for its peer so that it // The heartbeat thread uses a dedicated RPC client for its peer so that it
// can issue heartbeats in parallel with other RPC requests that are being // can issue heartbeats in parallel with other RPC requests that are being
// issued to the peer (replication, voting, etc.) // issued to the peer (replication, voting, etc.)
std::unique_ptr<communication::rpc::Client> rpc_client; std::unique_ptr<rpc::Client> rpc_client;
while (!exiting_) { while (!exiting_) {
TimePoint wait_until; TimePoint wait_until;
@ -736,7 +736,7 @@ void RaftServer::HBThreadMain(uint16_t peer_id) {
lock.unlock(); lock.unlock();
if (!rpc_client) { if (!rpc_client) {
rpc_client = std::make_unique<communication::rpc::Client>( rpc_client = std::make_unique<rpc::Client>(
coordination_->GetOtherNodeEndpoint(peer_id), coordination_->GetOtherNodeEndpoint(peer_id),
coordination_->GetRpcClientContext()); coordination_->GetRpcClientContext());
} }

View File

@ -4,7 +4,7 @@
#include <vector> #include <vector>
#include <string> #include <string>
#include "communication/rpc/messages.hpp" #include "rpc/messages.hpp"
#include "slk/serialization.hpp" #include "slk/serialization.hpp"
cpp<# cpp<#

8
src/rpc/CMakeLists.txt Normal file
View File

@ -0,0 +1,8 @@
set(rpc_src_files
client.cpp
protocol.cpp
server.cpp)
add_library(mg-rpc STATIC ${rpc_src_files})
target_link_libraries(mg-rpc Threads::Threads mg-communication mg-utils mg-io fmt glog gflags)
target_link_libraries(mg-rpc mg-slk)

View File

@ -1,6 +1,6 @@
#include "communication/rpc/client.hpp" #include "rpc/client.hpp"
namespace communication::rpc { namespace rpc {
Client::Client(const io::network::Endpoint &endpoint, Client::Client(const io::network::Endpoint &endpoint,
communication::ClientContext *context) communication::ClientContext *context)
@ -14,4 +14,4 @@ void Client::Abort() {
client_ = std::nullopt; client_ = std::nullopt;
} }
} // namespace communication::rpc } // namespace rpc

View File

@ -7,14 +7,14 @@
#include <glog/logging.h> #include <glog/logging.h>
#include "communication/client.hpp" #include "communication/client.hpp"
#include "communication/rpc/exceptions.hpp"
#include "communication/rpc/messages.hpp"
#include "io/network/endpoint.hpp" #include "io/network/endpoint.hpp"
#include "rpc/exceptions.hpp"
#include "rpc/messages.hpp"
#include "slk/serialization.hpp" #include "slk/serialization.hpp"
#include "slk/streams.hpp" #include "slk/streams.hpp"
#include "utils/on_scope_exit.hpp" #include "utils/on_scope_exit.hpp"
namespace communication::rpc { namespace rpc {
/// Client is thread safe, but it is recommended to use thread_local clients. /// Client is thread safe, but it is recommended to use thread_local clients.
class Client { class Client {
@ -128,4 +128,4 @@ class Client {
std::mutex mutex_; std::mutex mutex_;
}; };
} // namespace communication::rpc } // namespace rpc

View File

@ -3,9 +3,9 @@
#include <mutex> #include <mutex>
#include <stack> #include <stack>
#include "communication/rpc/client.hpp" #include "rpc/client.hpp"
namespace communication::rpc { namespace rpc {
/** /**
* A simple client pool that creates new RPC clients on demand. Useful when you * A simple client pool that creates new RPC clients on demand. Useful when you
@ -62,4 +62,4 @@ class ClientPool {
std::stack<std::unique_ptr<Client>> unused_clients_; std::stack<std::unique_ptr<Client>> unused_clients_;
}; };
} // namespace communication::rpc } // namespace rpc

View File

@ -1,7 +1,7 @@
#include "io/network/endpoint.hpp" #include "io/network/endpoint.hpp"
#include "utils/exceptions.hpp" #include "utils/exceptions.hpp"
namespace communication::rpc { namespace rpc {
/// Exception that is thrown whenever a RPC call fails. /// Exception that is thrown whenever a RPC call fails.
/// This exception inherits `std::exception` directly because /// This exception inherits `std::exception` directly because
@ -22,4 +22,4 @@ class RpcFailedException final : public utils::BasicException {
private: private:
io::network::Endpoint endpoint_; io::network::Endpoint endpoint_;
}; };
} // namespace communication::rpc } // namespace rpc

View File

@ -5,7 +5,7 @@
#include "utils/typeinfo.hpp" #include "utils/typeinfo.hpp"
namespace communication::rpc { namespace rpc {
using MessageSize = uint32_t; using MessageSize = uint32_t;
@ -23,4 +23,4 @@ struct RequestResponse {
using Response = TResponse; using Response = TResponse;
}; };
} // namespace communication::rpc } // namespace rpc

View File

@ -1,12 +1,12 @@
#include "communication/rpc/protocol.hpp" #include "rpc/protocol.hpp"
#include "communication/rpc/messages.hpp" #include "rpc/messages.hpp"
#include "communication/rpc/server.hpp" #include "rpc/server.hpp"
#include "slk/serialization.hpp" #include "slk/serialization.hpp"
#include "slk/streams.hpp" #include "slk/streams.hpp"
#include "utils/on_scope_exit.hpp" #include "utils/on_scope_exit.hpp"
namespace communication::rpc { namespace rpc {
Session::Session(Server *server, const io::network::Endpoint &endpoint, Session::Session(Server *server, const io::network::Endpoint &endpoint,
communication::InputStream *input_stream, communication::InputStream *input_stream,
@ -76,4 +76,4 @@ void Session::Execute() {
: extended_it->second.res_type.name); : extended_it->second.res_type.name);
} }
} // namespace communication::rpc } // namespace rpc

View File

@ -4,8 +4,8 @@
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include "communication/rpc/messages.hpp"
#include "communication/session.hpp" #include "communication/session.hpp"
#include "rpc/messages.hpp"
/** /**
* @brief Protocol * @brief Protocol
@ -16,7 +16,7 @@
* Message layout: MessageSize message_size, * Message layout: MessageSize message_size,
* message_size bytes serialized_message * message_size bytes serialized_message
*/ */
namespace communication::rpc { namespace rpc {
// Forward declaration of class Server // Forward declaration of class Server
class Server; class Server;
@ -54,4 +54,4 @@ class Session {
communication::OutputStream *output_stream_; communication::OutputStream *output_stream_;
}; };
} // namespace communication::rpc } // namespace rpc

View File

@ -1,6 +1,6 @@
#include "communication/rpc/server.hpp" #include "rpc/server.hpp"
namespace communication::rpc { namespace rpc {
Server::Server(const io::network::Endpoint &endpoint, Server::Server(const io::network::Endpoint &endpoint,
communication::ServerContext *context, size_t workers_count) communication::ServerContext *context, size_t workers_count)
@ -16,4 +16,4 @@ void Server::AwaitShutdown() { server_.AwaitShutdown(); }
const io::network::Endpoint &Server::endpoint() const { const io::network::Endpoint &Server::endpoint() const {
return server_.endpoint(); return server_.endpoint();
} }
} // namespace communication::rpc } // namespace rpc

View File

@ -4,13 +4,13 @@
#include <mutex> #include <mutex>
#include <vector> #include <vector>
#include "communication/rpc/messages.hpp"
#include "communication/rpc/protocol.hpp"
#include "communication/server.hpp" #include "communication/server.hpp"
#include "io/network/endpoint.hpp" #include "io/network/endpoint.hpp"
#include "rpc/messages.hpp"
#include "rpc/protocol.hpp"
#include "slk/streams.hpp" #include "slk/streams.hpp"
namespace communication::rpc { namespace rpc {
class Server { class Server {
public: public:
@ -90,6 +90,6 @@ class Server {
std::map<uint64_t, RpcExtendedCallback> extended_callbacks_; std::map<uint64_t, RpcExtendedCallback> extended_callbacks_;
communication::Server<Session, Server> server_; communication::Server<Session, Server> server_;
}; // namespace communication::rpc };
} // namespace communication::rpc } // namespace rpc

View File

@ -37,7 +37,7 @@ add_benchmark(query/stripped.cpp)
target_link_libraries(${test_prefix}stripped mg-query) target_link_libraries(${test_prefix}stripped mg-query)
add_benchmark(rpc.cpp) add_benchmark(rpc.cpp)
target_link_libraries(${test_prefix}rpc mg-comm-rpc) target_link_libraries(${test_prefix}rpc mg-rpc)
add_benchmark(skip_list_random.cpp) add_benchmark(skip_list_random.cpp)
target_link_libraries(${test_prefix}skip_list_random mg-utils) target_link_libraries(${test_prefix}skip_list_random mg-utils)

View File

@ -3,10 +3,10 @@
#include <benchmark/benchmark.h> #include <benchmark/benchmark.h>
#include "communication/rpc/client.hpp" #include "rpc/client.hpp"
#include "communication/rpc/client_pool.hpp" #include "rpc/client_pool.hpp"
#include "communication/rpc/messages.hpp" #include "rpc/messages.hpp"
#include "communication/rpc/server.hpp" #include "rpc/server.hpp"
#include "slk/serialization.hpp" #include "slk/serialization.hpp"
#include "utils/timer.hpp" #include "utils/timer.hpp"
@ -38,7 +38,7 @@ void EchoMessage::Save(const EchoMessage &obj, slk::Builder *builder) {
const utils::TypeInfo EchoMessage::kType{2, "EchoMessage"}; const utils::TypeInfo EchoMessage::kType{2, "EchoMessage"};
using Echo = communication::rpc::RequestResponse<EchoMessage, EchoMessage>; using Echo = rpc::RequestResponse<EchoMessage, EchoMessage>;
const int kThreadsNum = 16; const int kThreadsNum = 16;
@ -51,10 +51,10 @@ DEFINE_bool(run_server, true, "Set to false to use external server");
DEFINE_bool(run_benchmark, true, "Set to false to only run server"); DEFINE_bool(run_benchmark, true, "Set to false to only run server");
std::optional<communication::ServerContext> server_context; std::optional<communication::ServerContext> server_context;
std::optional<communication::rpc::Server> server; std::optional<rpc::Server> server;
std::optional<communication::ClientContext> client_context; std::optional<communication::ClientContext> client_context;
std::optional<communication::rpc::Client> clients[kThreadsNum]; std::optional<rpc::Client> clients[kThreadsNum];
std::optional<communication::rpc::ClientPool> client_pool; std::optional<rpc::ClientPool> client_pool;
static void BenchmarkRpc(benchmark::State &state) { static void BenchmarkRpc(benchmark::State &state) {
std::string data(state.range(0), 'a'); std::string data(state.range(0), 'a');

View File

@ -146,7 +146,7 @@ add_unit_test(network_timeouts.cpp)
target_link_libraries(${test_prefix}network_timeouts mg-communication) target_link_libraries(${test_prefix}network_timeouts mg-communication)
add_unit_test(rpc.cpp) add_unit_test(rpc.cpp)
target_link_libraries(${test_prefix}rpc mg-comm-rpc) target_link_libraries(${test_prefix}rpc mg-rpc)
# Test data structures # Test data structures

View File

@ -3,15 +3,15 @@
#include "gmock/gmock.h" #include "gmock/gmock.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "communication/rpc/client.hpp" #include "rpc/client.hpp"
#include "communication/rpc/client_pool.hpp" #include "rpc/client_pool.hpp"
#include "communication/rpc/messages.hpp" #include "rpc/messages.hpp"
#include "communication/rpc/server.hpp" #include "rpc/server.hpp"
#include "utils/timer.hpp" #include "utils/timer.hpp"
#include "rpc_messages.hpp" #include "rpc_messages.hpp"
using namespace communication::rpc; using namespace rpc;
using namespace std::literals::chrono_literals; using namespace std::literals::chrono_literals;
namespace slk { namespace slk {
@ -97,8 +97,7 @@ TEST(Rpc, Abort) {
}); });
utils::Timer timer; utils::Timer timer;
EXPECT_THROW(client.Call<Sum>(10, 20), EXPECT_THROW(client.Call<Sum>(10, 20), RpcFailedException);
communication::rpc::RpcFailedException);
EXPECT_LT(timer.Elapsed(), 200ms); EXPECT_LT(timer.Elapsed(), 200ms);
thread.join(); thread.join();

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include "communication/rpc/messages.hpp" #include "rpc/messages.hpp"
#include "slk/serialization.hpp" #include "slk/serialization.hpp"
#include "utils/typeinfo.hpp" #include "utils/typeinfo.hpp"
@ -41,7 +41,7 @@ void Save(const SumRes &res, Builder *builder);
void Load(SumRes *res, Reader *reader); void Load(SumRes *res, Reader *reader);
} // namespace slk } // namespace slk
using Sum = communication::rpc::RequestResponse<SumReq, SumRes>; using Sum = rpc::RequestResponse<SumReq, SumRes>;
struct EchoMessage { struct EchoMessage {
static const utils::TypeInfo kType; static const utils::TypeInfo kType;
@ -62,4 +62,4 @@ void Save(const EchoMessage &echo, Builder *builder);
void Load(EchoMessage *echo, Reader *reader); void Load(EchoMessage *echo, Reader *reader);
} // namespace slk } // namespace slk
using Echo = communication::rpc::RequestResponse<EchoMessage, EchoMessage>; using Echo = rpc::RequestResponse<EchoMessage, EchoMessage>;