Refactor RPC

Summary:
Previously, the RPC stack used the network stack only to receive messages. The
messages were then added to a separate queue that was processed by different
thread pools. This design was inefficient because there was a lock when
inserting and getting messages from the common queue.

This diff removes the need for separate thread pools by utilising the new
network stack design. This is possible because the new network stack allows
full processing of the network request without blocking the whole queue.

Reviewers: buda, florijan, teon.banek, dgleich, mislav.bradac

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1229
This commit is contained in:
Matej Ferencevic 2018-02-23 10:56:56 +01:00
parent 604ebf9d1e
commit c877c87bb4
55 changed files with 245 additions and 440 deletions

View File

@ -19,19 +19,15 @@
namespace communication::raft {
const char *kRaftChannelName = "raft-peer-rpc-channel";
template <class State>
using PeerProtocol = rpc::RequestResponse<PeerRpcRequest<State>, PeerRpcReply>;
template <class State>
class RpcNetwork : public RaftNetworkInterface<State> {
public:
RpcNetwork(rpc::System &system,
RpcNetwork(rpc::Server &server,
std::unordered_map<std::string, io::network::Endpoint> directory)
: system_(system),
directory_(std::move(directory)),
server_(system, kRaftChannelName) {}
: server_(server), directory_(std::move(directory)) {}
virtual void Start(RaftMember<State> &member) override {
server_.Register<PeerProtocol<State>>(
@ -106,15 +102,14 @@ class RpcNetwork : public RaftNetworkInterface<State> {
auto it = clients_.find(id);
if (it == clients_.end()) {
auto ne = directory_[id];
it = clients_.try_emplace(id, ne, kRaftChannelName).first;
it = clients_.try_emplace(id, ne).first;
}
return it->second;
}
rpc::System &system_;
rpc::Server &server_;
// TODO(mtomic): how to update and distribute this?
std::unordered_map<MemberId, io::network::Endpoint> directory_;
rpc::Server server_;
std::unordered_map<MemberId, rpc::Client> clients_;
};

View File

@ -9,15 +9,11 @@
namespace communication::rpc {
Client::Client(const io::network::Endpoint &endpoint,
const std::string &service_name)
: endpoint_(endpoint), service_name_(service_name) {}
Client::Client(const io::network::Endpoint &endpoint) : endpoint_(endpoint) {}
std::unique_ptr<Message> Client::Call(std::unique_ptr<Message> request) {
std::lock_guard<std::mutex> guard(mutex_);
uint32_t request_id = ++next_message_id_;
// Check if the connection is broken (if we haven't used the client for a
// long time the server could have died).
if (socket_ && socket_->ErrorStatus()) {
@ -35,30 +31,6 @@ std::unique_ptr<Message> Client::Call(std::unique_ptr<Message> request) {
}
socket_->SetKeepAlive();
// Send service name size.
MessageSize service_len = service_name_.size();
if (!socket_->Write(reinterpret_cast<uint8_t *>(&service_len),
sizeof(MessageSize), true)) {
LOG(ERROR) << "Couldn't send service name size!";
socket_ = std::experimental::nullopt;
return nullptr;
}
// Send service name.
if (!socket_->Write(service_name_)) {
LOG(ERROR) << "Couldn't send service name!";
socket_ = std::experimental::nullopt;
return nullptr;
}
}
// Send current request ID.
if (!socket_->Write(reinterpret_cast<uint8_t *>(&request_id),
sizeof(uint32_t), true)) {
LOG(ERROR) << "Couldn't send request ID!";
socket_ = std::experimental::nullopt;
return nullptr;
}
// Serialize and send request.
@ -99,12 +71,10 @@ std::unique_ptr<Message> Client::Call(std::unique_ptr<Message> request) {
}
buffer_.Written(received);
if (buffer_.size() < sizeof(uint32_t) + sizeof(MessageSize)) continue;
uint32_t response_id = *reinterpret_cast<uint32_t *>(buffer_.data());
if (buffer_.size() < sizeof(MessageSize)) continue;
MessageSize response_data_size =
*reinterpret_cast<MessageSize *>(buffer_.data() + sizeof(uint32_t));
size_t response_size =
sizeof(uint32_t) + sizeof(MessageSize) + response_data_size;
*reinterpret_cast<MessageSize *>(buffer_.data());
size_t response_size = sizeof(MessageSize) + response_data_size;
buffer_.Resize(response_size);
if (buffer_.size() < response_size) continue;
@ -113,8 +83,7 @@ std::unique_ptr<Message> Client::Call(std::unique_ptr<Message> request) {
std::stringstream response_stream(std::ios_base::in |
std::ios_base::binary);
response_stream.str(std::string(
reinterpret_cast<char *>(buffer_.data() + sizeof(uint32_t) +
sizeof(MessageSize)),
reinterpret_cast<char *>(buffer_.data() + sizeof(MessageSize)),
response_data_size));
boost::archive::binary_iarchive response_archive(response_stream);
response_archive >> response;
@ -122,12 +91,6 @@ std::unique_ptr<Message> Client::Call(std::unique_ptr<Message> request) {
buffer_.Shift(response_size);
if (response_id != request_id) {
// This can happen if some stale response arrives after we issued a new
// request.
continue;
}
return response;
}
}

View File

@ -18,7 +18,7 @@ namespace communication::rpc {
// Client is thread safe, but it is recommended to use thread_local clients.
class Client {
public:
Client(const io::network::Endpoint &endpoint, const std::string &name);
Client(const io::network::Endpoint &endpoint);
// Call function can initiate only one request at the time. Function blocks
// until there is a response. If there was an error nullptr is returned.
@ -31,7 +31,7 @@ class Client {
static_assert(std::is_base_of<Message, Res>::value,
"TRequestResponse::Response must be derived from Message");
std::string request_name =
fmt::format("rpc.client.{}.{}", service_name_,
fmt::format("rpc.client.{}",
utils::Demangle(typeid(Req).name()).value_or("unknown"));
std::unique_ptr<Message> response = nullptr;
stats::Stopwatch(request_name, [&] {
@ -57,11 +57,8 @@ class Client {
std::unique_ptr<Message> Call(std::unique_ptr<Message> request);
io::network::Endpoint endpoint_;
std::string service_name_;
std::experimental::optional<io::network::Socket> socket_;
uint32_t next_message_id_{0};
Buffer buffer_;
std::mutex mutex_;

View File

@ -14,8 +14,7 @@ namespace communication::rpc {
*/
class ClientPool {
public:
ClientPool(const io::network::Endpoint &endpoint, const std::string &name)
: endpoint_(endpoint), name_(name) {}
ClientPool(const io::network::Endpoint &endpoint) : endpoint_(endpoint) {}
template <typename TRequestResponse, typename... Args>
std::unique_ptr<typename TRequestResponse::Response> Call(Args &&... args) {
@ -23,7 +22,7 @@ class ClientPool {
std::unique_lock<std::mutex> lock(mutex_);
if (unused_clients_.empty()) {
client = std::make_unique<Client>(endpoint_, name_);
client = std::make_unique<Client>(endpoint_);
} else {
client = std::move(unused_clients_.top());
unused_clients_.pop();
@ -39,7 +38,6 @@ class ClientPool {
private:
io::network::Endpoint endpoint_;
std::string name_;
std::mutex mutex_;
std::stack<std::unique_ptr<Client>> unused_clients_;

View File

@ -4,92 +4,83 @@
#include "boost/archive/binary_oarchive.hpp"
#include "boost/serialization/unique_ptr.hpp"
#include "fmt/format.h"
#include "glog/logging.h"
#include "communication/rpc/messages-inl.hpp"
#include "communication/rpc/messages.hpp"
#include "communication/rpc/protocol.hpp"
#include "communication/rpc/server.hpp"
#include "stats/metrics.hpp"
#include "utils/demangle.hpp"
namespace communication::rpc {
Session::Session(Socket &&socket, System &system)
: socket_(std::make_shared<Socket>(std::move(socket))), system_(system) {}
Session::Session(Socket &&socket, Server &server)
: socket_(std::move(socket)), server_(server) {}
void Session::Execute() {
if (!handshake_done_) {
if (buffer_.size() < sizeof(MessageSize)) return;
MessageSize service_len = *reinterpret_cast<MessageSize *>(buffer_.data());
buffer_.Resize(sizeof(MessageSize) + service_len);
if (buffer_.size() < sizeof(MessageSize) + service_len) return;
service_name_ = std::string(
reinterpret_cast<char *>(buffer_.data() + sizeof(MessageSize)),
service_len);
buffer_.Shift(sizeof(MessageSize) + service_len);
handshake_done_ = true;
}
if (buffer_.size() < sizeof(uint32_t) + sizeof(MessageSize)) return;
uint32_t message_id = *reinterpret_cast<uint32_t *>(buffer_.data());
MessageSize message_len =
*reinterpret_cast<MessageSize *>(buffer_.data() + sizeof(uint32_t));
uint64_t request_size = sizeof(uint32_t) + sizeof(MessageSize) + message_len;
if (buffer_.size() < sizeof(MessageSize)) return;
MessageSize request_len = *reinterpret_cast<MessageSize *>(buffer_.data());
uint64_t request_size = sizeof(MessageSize) + request_len;
buffer_.Resize(request_size);
if (buffer_.size() < request_size) return;
// TODO (mferencevic): check for exceptions
std::unique_ptr<Message> message;
std::unique_ptr<Message> request;
{
std::stringstream stream(std::ios_base::in | std::ios_base::binary);
stream.str(
std::string(reinterpret_cast<char *>(buffer_.data() + sizeof(uint32_t) +
sizeof(MessageSize)),
message_len));
stream.str(std::string(
reinterpret_cast<char *>(buffer_.data() + sizeof(MessageSize)),
request_len));
boost::archive::binary_iarchive archive(stream);
archive >> message;
archive >> request;
}
buffer_.Shift(sizeof(uint32_t) + sizeof(MessageSize) + message_len);
buffer_.Shift(sizeof(MessageSize) + request_len);
system_.AddTask(socket_, service_name_, message_id, std::move(message));
auto callbacks_accessor = server_.callbacks_.access();
auto it = callbacks_accessor.find(request->type_index());
if (it == callbacks_accessor.end()) {
// Throw exception to close the socket and cleanup the session.
throw SessionException(
"Session trying to execute an unregistered RPC call!");
}
auto request_name = fmt::format(
"rpc.server.{}",
utils::Demangle(request->type_index().name()).value_or("unknown"));
std::unique_ptr<Message> response = nullptr;
stats::Stopwatch(request_name,
[&] { response = it->second(*(request.get())); });
if (!response) {
throw SessionException("Trying to send nullptr instead of message");
}
// Serialize and send response
std::stringstream stream(std::ios_base::out | std::ios_base::binary);
{
boost::archive::binary_oarchive archive(stream);
archive << response;
// Archive destructor ensures everything is written.
}
const std::string &buffer = stream.str();
if (buffer.size() > std::numeric_limits<MessageSize>::max()) {
throw SessionException(fmt::format(
"Trying to send response of size {}, max response size is {}",
buffer.size(), std::numeric_limits<MessageSize>::max()));
}
MessageSize buffer_size = buffer.size();
if (!socket_.Write(reinterpret_cast<uint8_t *>(&buffer_size),
sizeof(MessageSize), true)) {
throw SessionException("Couldn't send response size!");
}
if (!socket_.Write(buffer)) {
throw SessionException("Couldn't send response data!");
}
}
StreamBuffer Session::Allocate() { return buffer_.Allocate(); }
void Session::Written(size_t len) { buffer_.Written(len); }
void SendMessage(Socket &socket, uint32_t message_id,
std::unique_ptr<Message> &message) {
CHECK(message) << "Trying to send nullptr instead of message";
// Serialize and send message
std::stringstream stream(std::ios_base::out | std::ios_base::binary);
{
boost::archive::binary_oarchive archive(stream);
archive << message;
// Archive destructor ensures everything is written.
}
const std::string &buffer = stream.str();
CHECK(buffer.size() <= std::numeric_limits<MessageSize>::max())
<< fmt::format(
"Trying to send message of size {}, max message size is {}",
buffer.size(), std::numeric_limits<MessageSize>::max());
if (!socket.Write(reinterpret_cast<uint8_t *>(&message_id), sizeof(uint32_t),
true)) {
LOG(WARNING) << "Couldn't send message id!";
return;
}
MessageSize buffer_size = buffer.size();
if (!socket.Write(reinterpret_cast<uint8_t *>(&buffer_size),
sizeof(MessageSize), true)) {
LOG(WARNING) << "Couldn't send message size!";
return;
}
if (!socket.Write(buffer)) {
LOG(WARNING) << "Couldn't send message data!";
return;
}
}
} // namespace communication::rpc

View File

@ -16,9 +16,7 @@
* Has classes and functions that implement the server side of our
* RPC protocol.
*
* Handshake layout: MessageSize service_size, service_size characters service
*
* Message layout: uint32_t message_id, MessageSize message_size,
* Message layout: MessageSize message_size,
* message_size bytes serialized_message
*/
namespace communication::rpc {
@ -27,8 +25,16 @@ using Endpoint = io::network::Endpoint;
using Socket = io::network::Socket;
using StreamBuffer = io::network::StreamBuffer;
// Forward declaration of class System
class System;
// Forward declaration of class Server
class Server;
/**
* This class is thrown when the Session wants to indicate that a fatal error
* occured during execution.
*/
class SessionException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/**
* Distributed Protocol Session
@ -37,9 +43,9 @@ class System;
*/
class Session {
public:
Session(Socket &&socket, System &system);
Session(Socket &&socket, Server &server);
int Id() const { return socket_->fd(); }
int Id() const { return socket_.fd(); }
/**
* Executes the protocol after data has been read into the buffer.
@ -66,7 +72,7 @@ class Session {
bool TimedOut() { return false; }
Socket &socket() { return *socket_; }
Socket &socket() { return socket_; }
void RefreshLastEventTime(
const std::chrono::time_point<std::chrono::steady_clock>
@ -75,21 +81,12 @@ class Session {
}
private:
std::shared_ptr<Socket> socket_;
Socket socket_;
std::chrono::time_point<std::chrono::steady_clock> last_event_time_ =
std::chrono::steady_clock::now();
System &system_;
std::string service_name_;
bool handshake_done_{false};
Server &server_;
Buffer buffer_;
};
/**
* Distributed Protocol Server Send Message
*/
void SendMessage(Socket &socket, uint32_t message_id,
std::unique_ptr<Message> &message);
} // namespace communication::rpc

View File

@ -6,93 +6,19 @@
#include "boost/serialization/unique_ptr.hpp"
#include "communication/rpc/server.hpp"
#include "stats/metrics.hpp"
#include "utils/demangle.hpp"
namespace communication::rpc {
System::System(const io::network::Endpoint &endpoint,
const size_t workers_count)
Server::Server(const io::network::Endpoint &endpoint,
size_t workers_count)
: server_(endpoint, *this, false, workers_count) {}
System::~System() {}
void System::AddTask(std::shared_ptr<Socket> socket, const std::string &service,
uint64_t message_id, std::unique_ptr<Message> message) {
std::unique_lock<std::mutex> guard(mutex_);
auto it = services_.find(service);
if (it == services_.end()) return;
it->second->queue_.Emplace(std::move(socket), message_id, std::move(message));
void Server::StopProcessingCalls() {
server_.Shutdown();
server_.AwaitShutdown();
}
void System::Add(Server &server) {
std::unique_lock<std::mutex> guard(mutex_);
auto got = services_.emplace(server.service_name(), &server);
CHECK(got.second) << fmt::format("Server with name {} already exists",
server.service_name());
const io::network::Endpoint &Server::endpoint() const {
return server_.endpoint();
}
void System::Remove(const Server &server) {
std::unique_lock<std::mutex> guard(mutex_);
auto it = services_.find(server.service_name());
CHECK(it != services_.end()) << "Trying to delete nonexisting server";
services_.erase(it);
}
std::string RequestName(const std::string &service_name,
const std::type_index &msg_type_id) {
int s;
char *message_type = abi::__cxa_demangle(msg_type_id.name(), NULL, NULL, &s);
std::string ret;
if (s == 0) {
ret = fmt::format("rpc.server.{}.{}", service_name, message_type);
} else {
ret = fmt::format("rpc.server.{}.unknown", service_name);
}
free(message_type);
return ret;
}
Server::Server(System &system, const std::string &service_name,
int workers_count)
: system_(system), service_name_(service_name) {
system_.Add(*this);
stats::Gauge &queue_size =
stats::GetGauge(fmt::format("rpc.server.{}.queue_size", service_name));
for (int i = 0; i < workers_count; ++i) {
threads_.push_back(std::thread([this, service_name, &queue_size]() {
// TODO: Add logging.
while (alive_) {
auto task = queue_.AwaitPop();
queue_size.Set(queue_.size());
if (!task) continue;
auto socket = std::move(std::get<0>(*task));
auto message_id = std::get<1>(*task);
auto message = std::move(std::get<2>(*task));
auto callbacks_accessor = callbacks_.access();
auto it = callbacks_accessor.find(message->type_index());
if (it == callbacks_accessor.end()) continue;
auto request_name = fmt::format(
"rpc.server.{}.{}", service_name,
utils::Demangle(message->type_index().name()).value_or("unknown"));
std::unique_ptr<Message> response = nullptr;
stats::Stopwatch(request_name,
[&] { response = it->second(*(message.get())); });
SendMessage(*socket, message_id, response);
}
}));
}
}
Server::~Server() {
alive_.store(false);
queue_.Shutdown();
for (auto &thread : threads_) {
if (thread.joinable()) thread.join();
}
system_.Remove(*this);
}
} // namespace communication::rpc

View File

@ -13,46 +13,18 @@
namespace communication::rpc {
// Forward declaration of Server class
class Server;
class System {
public:
System(const io::network::Endpoint &endpoint,
const size_t workers_count = std::thread::hardware_concurrency());
System(const System &) = delete;
System(System &&) = delete;
System &operator=(const System &) = delete;
System &operator=(System &&) = delete;
~System();
const io::network::Endpoint &endpoint() const { return server_.endpoint(); }
private:
using ServerT = communication::Server<Session, System>;
friend class Session;
friend class Server;
/** Start a threadpool that relays the messages from the sockets to the
* LocalEventStreams */
void StartServer(int workers_count);
void AddTask(std::shared_ptr<Socket> socket, const std::string &service,
uint64_t message_id, std::unique_ptr<Message> message);
void Add(Server &server);
void Remove(const Server &server);
std::mutex mutex_;
// Service name to its server mapping.
std::unordered_map<std::string, Server *> services_;
ServerT server_;
};
class Server {
public:
Server(System &system, const std::string &name,
int workers_count = std::thread::hardware_concurrency());
~Server();
Server(const io::network::Endpoint &endpoint,
size_t workers_count = std::thread::hardware_concurrency());
Server(const Server &) = delete;
Server(Server &&) = delete;
Server &operator=(const Server &) = delete;
Server &operator=(Server &&) = delete;
void StopProcessingCalls();
const io::network::Endpoint &endpoint() const;
template <typename TRequestResponse>
void Register(
@ -77,19 +49,15 @@ class Server {
CHECK(got.second) << "Callback for that message type already registered";
}
const std::string &service_name() const { return service_name_; }
private:
friend class System;
System &system_;
Queue<std::tuple<std::shared_ptr<Socket>, uint64_t, std::unique_ptr<Message>>>
queue_;
std::string service_name_;
friend class Session;
ConcurrentMap<std::type_index,
std::function<std::unique_ptr<Message>(const Message &)>>
callbacks_;
std::atomic<bool> alive_{true};
std::vector<std::thread> threads_;
std::mutex mutex_;
communication::Server<Session, Server> server_;
};
} // namespace communication::rpc

View File

@ -47,6 +47,10 @@ DEFINE_VALIDATED_HIDDEN_int32(
"indicates the port on which to serve. If zero (default value), a port is "
"chosen at random. Sent to the master when registring worker node.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
DEFINE_VALIDATED_HIDDEN_int32(rpc_num_workers,
std::max(std::thread::hardware_concurrency(), 1U),
"Number of workers (RPC)",
FLAG_IN_RANGE(1, INT32_MAX));
database::Config::Config()
// Durability flags.
@ -59,6 +63,7 @@ database::Config::Config()
// Misc flags.
gc_cycle_sec{FLAGS_gc_cycle_sec},
query_execution_time_sec{FLAGS_query_execution_time_sec},
rpc_num_workers{FLAGS_rpc_num_workers},
// Distributed flags.
worker_id{FLAGS_worker_id},
master_endpoint{FLAGS_master_host,

View File

@ -7,8 +7,6 @@
namespace database {
const std::string kCountersRpc = "CountersRpc";
RPC_SINGLE_MEMBER_MESSAGE(CountersGetReq, std::string);
RPC_SINGLE_MEMBER_MESSAGE(CountersGetRes, int64_t);
using CountersGetRpc =
@ -32,8 +30,8 @@ void SingleNodeCounters::Set(const std::string &name, int64_t value) {
if (!name_counter_pair.second) name_counter_pair.first->second.store(value);
}
MasterCounters::MasterCounters(communication::rpc::System &system)
: rpc_server_(system, kCountersRpc) {
MasterCounters::MasterCounters(communication::rpc::Server &server)
: rpc_server_(server) {
rpc_server_.Register<CountersGetRpc>([this](const CountersGetReq &req) {
return std::make_unique<CountersGetRes>(Get(req.member));
});
@ -44,7 +42,7 @@ MasterCounters::MasterCounters(communication::rpc::System &system)
}
WorkerCounters::WorkerCounters(const io::network::Endpoint &master_endpoint)
: rpc_client_pool_(master_endpoint, kCountersRpc) {}
: rpc_client_pool_(master_endpoint) {}
int64_t WorkerCounters::Get(const std::string &name) {
auto response = rpc_client_pool_.Call<CountersGetRpc>(name);

View File

@ -45,10 +45,10 @@ class SingleNodeCounters : public Counters {
/** Implementation for distributed master. */
class MasterCounters : public SingleNodeCounters {
public:
MasterCounters(communication::rpc::System &system);
MasterCounters(communication::rpc::Server &server);
private:
communication::rpc::Server rpc_server_;
communication::rpc::Server &rpc_server_;
};
/** Implementation for distributed worker. */

View File

@ -161,20 +161,27 @@ class Master : public PrivateBase {
return index_rpc_clients_;
}
communication::rpc::System system_{config_.master_endpoint};
tx::MasterEngine tx_engine_{system_, &wal_};
~Master() {
// The server is stopped explicitly here to disable RPC calls during the
// destruction of this object. This works because this destructor is called
// before the destructors of all objects.
server_.StopProcessingCalls();
}
communication::rpc::Server server_{
config_.master_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
tx::MasterEngine tx_engine_{server_, &wal_};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
distributed::MasterCoordination coordination_{system_};
TypemapPack<MasterConcurrentIdMapper> typemap_pack_{system_};
database::MasterCounters counters_{system_};
distributed::RemoteDataRpcServer remote_data_server_{*this, system_};
distributed::MasterCoordination coordination_{server_};
TypemapPack<MasterConcurrentIdMapper> typemap_pack_{server_};
database::MasterCounters counters_{server_};
distributed::RemoteDataRpcServer remote_data_server_{*this, server_};
distributed::RemoteDataRpcClients remote_data_clients_{coordination_};
distributed::PlanDispatcher plan_dispatcher_{coordination_};
distributed::RemotePullRpcClients remote_pull_clients_{coordination_};
distributed::RpcWorkerClients index_rpc_clients_{coordination_,
distributed::kIndexRpcName};
distributed::RpcWorkerClients index_rpc_clients_{coordination_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_,
system_};
server_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
distributed::RemoteDataManager remote_data_manager_{tx_engine_,
remote_data_clients_};
@ -196,21 +203,29 @@ class Worker : public PrivateBase {
return remote_produce_server_;
}
communication::rpc::System system_{config_.worker_endpoint};
distributed::WorkerCoordination coordination_{system_,
~Worker() {
// The server is stopped explicitly here to disable RPC calls during the
// destruction of this object. This works because this destructor is called
// before the destructors of all objects.
server_.StopProcessingCalls();
}
communication::rpc::Server server_{
config_.worker_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
distributed::WorkerCoordination coordination_{server_,
config_.master_endpoint};
tx::WorkerEngine tx_engine_{config_.master_endpoint};
StorageGc storage_gc_{storage_, tx_engine_, config_.gc_cycle_sec};
TypemapPack<WorkerConcurrentIdMapper> typemap_pack_{config_.master_endpoint};
database::WorkerCounters counters_{config_.master_endpoint};
distributed::RemoteDataRpcServer remote_data_server_{*this, system_};
distributed::RemoteDataRpcServer remote_data_server_{*this, server_};
distributed::RemoteDataRpcClients remote_data_clients_{coordination_};
distributed::PlanConsumer plan_consumer_{system_};
distributed::PlanConsumer plan_consumer_{server_};
distributed::RemoteProduceRpcServer remote_produce_server_{
*this, tx_engine_, system_, plan_consumer_};
distributed::IndexRpcServer index_rpc_server_{*this, system_};
*this, tx_engine_, server_, plan_consumer_};
distributed::IndexRpcServer index_rpc_server_{*this, server_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_,
system_};
server_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
distributed::RemoteDataManager remote_data_manager_{tx_engine_,
remote_data_clients_};
@ -334,7 +349,7 @@ Master::Master(Config config)
: MasterBase(std::make_unique<impl::Master>(config)) {}
io::network::Endpoint Master::endpoint() const {
return dynamic_cast<impl::Master *>(impl_.get())->system_.endpoint();
return dynamic_cast<impl::Master *>(impl_.get())->server_.endpoint();
}
io::network::Endpoint Master::GetEndpoint(int worker_id) {
@ -346,7 +361,7 @@ Worker::Worker(Config config)
: PublicBase(std::make_unique<impl::Worker>(config)) {}
io::network::Endpoint Worker::endpoint() const {
return dynamic_cast<impl::Worker *>(impl_.get())->system_.endpoint();
return dynamic_cast<impl::Worker *>(impl_.get())->server_.endpoint();
}
io::network::Endpoint Worker::GetEndpoint(int worker_id) {

View File

@ -43,6 +43,7 @@ struct Config {
// Misc flags.
int gc_cycle_sec;
int query_execution_time_sec;
int rpc_num_workers;
// Distributed master/worker flags.
int worker_id;

View File

@ -3,10 +3,10 @@
namespace distributed {
MasterCoordination::MasterCoordination(communication::rpc::System &system)
: server_(system, kCoordinationServerName) {
MasterCoordination::MasterCoordination(communication::rpc::Server &server)
: server_(server) {
// The master is always worker 0.
workers_.emplace(0, system.endpoint());
workers_.emplace(0, server_.endpoint());
server_.Register<RegisterWorkerRpc>([this](const RegisterWorkerReq &req) {
auto worker_id = RegisterWorker(req.desired_worker_id, req.endpoint);
@ -43,7 +43,7 @@ MasterCoordination::~MasterCoordination() {
for (const auto &kv : workers_) {
// Skip master (self).
if (kv.first == 0) continue;
communication::rpc::Client client(kv.second, kCoordinationServerName);
communication::rpc::Client client(kv.second);
auto result = client.Call<StopWorkerRpc>();
CHECK(result) << "Failed to shut down worker: " << kv.first;
}

View File

@ -27,7 +27,7 @@ class MasterCoordination : public Coordination {
int RegisterWorker(int desired_worker_id, Endpoint endpoint);
public:
explicit MasterCoordination(communication::rpc::System &system);
explicit MasterCoordination(communication::rpc::Server &server);
/** Shuts down all the workers and this master server. */
~MasterCoordination();
@ -39,7 +39,7 @@ class MasterCoordination : public Coordination {
std::vector<int> GetWorkerIds() const override;
private:
communication::rpc::Server server_;
communication::rpc::Server &server_;
// Most master functions aren't thread-safe.
mutable std::mutex lock_;
std::unordered_map<int, Endpoint> workers_;

View File

@ -8,8 +8,6 @@
namespace distributed {
const std::string kCoordinationServerName = "CoordinationRpc";
using communication::rpc::Message;
using Endpoint = io::network::Endpoint;

View File

@ -9,15 +9,13 @@ namespace distributed {
using namespace std::literals::chrono_literals;
WorkerCoordination::WorkerCoordination(communication::rpc::System &system,
WorkerCoordination::WorkerCoordination(communication::rpc::Server &server,
const Endpoint &master_endpoint)
: system_(system),
client_pool_(master_endpoint, kCoordinationServerName),
server_(system_, kCoordinationServerName) {}
: server_(server), client_pool_(master_endpoint) {}
int WorkerCoordination::RegisterWorker(int desired_worker_id) {
auto result = client_pool_.Call<RegisterWorkerRpc>(desired_worker_id,
system_.endpoint());
server_.endpoint());
CHECK(result) << "Failed to RegisterWorker with the master";
return result->member;
}

View File

@ -14,7 +14,7 @@ class WorkerCoordination : public Coordination {
using Endpoint = io::network::Endpoint;
public:
WorkerCoordination(communication::rpc::System &system,
WorkerCoordination(communication::rpc::Server &server,
const Endpoint &master_endpoint);
/**
@ -37,9 +37,8 @@ class WorkerCoordination : public Coordination {
void WaitForShutdown();
private:
communication::rpc::System &system_;
communication::rpc::Server &server_;
communication::rpc::ClientPool client_pool_;
communication::rpc::Server server_;
mutable ConcurrentMap<int, Endpoint> endpoint_cache_;
};
} // namespace distributed

View File

@ -7,7 +7,6 @@
#include "distributed/serialization.hpp"
namespace distributed {
const std::string kIndexRpcName = "IndexRpc";
struct IndexLabelPropertyTx {
storage::Label label;

View File

@ -10,8 +10,8 @@ namespace distributed {
class IndexRpcServer {
public:
IndexRpcServer(database::GraphDb &db, communication::rpc::System &system)
: db_(db), system_(system) {
IndexRpcServer(database::GraphDb &db, communication::rpc::Server &server)
: db_(db), rpc_server_(server) {
rpc_server_.Register<BuildIndexRpc>([this](const BuildIndexReq &req) {
LabelPropertyIndex::Key key{req.member.label, req.member.property};
@ -36,8 +36,7 @@ class IndexRpcServer {
private:
database::GraphDb &db_;
communication::rpc::System &system_;
communication::rpc::Server rpc_server_{system_, kIndexRpcName};
communication::rpc::Server &rpc_server_;
};
} // namespace distributed

View File

@ -2,8 +2,8 @@
namespace distributed {
PlanConsumer::PlanConsumer(communication::rpc::System &system)
: server_(system, kDistributedPlanServerName) {
PlanConsumer::PlanConsumer(communication::rpc::Server &server)
: server_(server) {
server_.Register<DistributedPlanRpc>([this](const DispatchPlanReq &req) {
plan_cache_.access().insert(
req.plan_id_,

View File

@ -24,13 +24,13 @@ class PlanConsumer {
const AstTreeStorage storage;
};
explicit PlanConsumer(communication::rpc::System &system);
explicit PlanConsumer(communication::rpc::Server &server);
/** Return cached plan and symbol table for a given plan id. */
PlanPack &PlanForId(int64_t plan_id) const;
private:
communication::rpc::Server server_;
communication::rpc::Server &server_;
// TODO remove unique_ptr. This is to get it to work, emplacing into a
// ConcurrentMap is tricky.
mutable ConcurrentMap<int64_t, std::unique_ptr<PlanPack>> plan_cache_;

View File

@ -3,7 +3,7 @@
namespace distributed {
PlanDispatcher::PlanDispatcher(Coordination &coordination)
: clients_(coordination, kDistributedPlanServerName) {}
: clients_(coordination) {}
void PlanDispatcher::DispatchPlan(
int64_t plan_id, std::shared_ptr<query::plan::LogicalOperator> plan,

View File

@ -10,8 +10,6 @@
namespace distributed {
const std::string kDistributedPlanServerName = "DistributedPlanRpc";
using communication::rpc::Message;
using SymbolTable = query::SymbolTable;
using AstTreeStorage = query::AstTreeStorage;

View File

@ -14,8 +14,7 @@ namespace distributed {
/** Provides access to other worker's data. */
class RemoteDataRpcClients {
public:
RemoteDataRpcClients(Coordination &coordination)
: clients_(coordination, kRemoteDataRpcName) {}
RemoteDataRpcClients(Coordination &coordination) : clients_(coordination) {}
/// Returns a remote worker's data for the given params. That worker must own
/// the vertex for the given id, and that vertex must be visible in given

View File

@ -11,7 +11,6 @@
#include "transactions/type.hpp"
namespace distributed {
const std::string kRemoteDataRpcName = "RemoteDataRpc";
struct TxGidPair {
tx::transaction_id_t tx_id;

View File

@ -13,8 +13,8 @@ namespace distributed {
/** Serves this worker's data to others. */
class RemoteDataRpcServer {
public:
RemoteDataRpcServer(database::GraphDb &db, communication::rpc::System &system)
: db_(db), rpc_server_(system, kRemoteDataRpcName) {
RemoteDataRpcServer(database::GraphDb &db, communication::rpc::Server &server)
: db_(db), rpc_server_(server) {
rpc_server_.Register<RemoteVertexRpc>([this](const RemoteVertexReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id);
auto vertex = dba.FindVertexChecked(req.member.gid, false);
@ -33,6 +33,6 @@ class RemoteDataRpcServer {
private:
database::GraphDb &db_;
communication::rpc::Server rpc_server_;
communication::rpc::Server &rpc_server_;
};
} // namespace distributed

View File

@ -133,10 +133,10 @@ class RemoteProduceRpcServer {
public:
RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &tx_engine,
communication::rpc::System &system,
communication::rpc::Server &server,
const distributed::PlanConsumer &plan_consumer)
: db_(db),
remote_produce_rpc_server_(system, kRemotePullProduceRpcName),
remote_produce_rpc_server_(server),
plan_consumer_(plan_consumer),
tx_engine_(tx_engine) {
remote_produce_rpc_server_.Register<RemotePullRpc>(
@ -154,7 +154,7 @@ class RemoteProduceRpcServer {
private:
database::GraphDb &db_;
communication::rpc::Server remote_produce_rpc_server_;
communication::rpc::Server &remote_produce_rpc_server_;
const distributed::PlanConsumer &plan_consumer_;
std::map<std::pair<tx::transaction_id_t, int64_t>, OngoingProduce>

View File

@ -33,8 +33,6 @@ enum class RemotePullState {
QUERY_ERROR
};
const std::string kRemotePullProduceRpcName = "RemotePullProduceRpc";
struct RemotePullReq : public communication::rpc::Message {
RemotePullReq() {}
RemotePullReq(tx::transaction_id_t tx_id, tx::Snapshot tx_snapshot,

View File

@ -22,7 +22,7 @@ class RemotePullRpcClients {
public:
RemotePullRpcClients(Coordination &coordination)
: clients_(coordination, kRemotePullProduceRpcName) {}
: clients_(coordination) {}
/// Calls a remote pull asynchroniously. IMPORTANT: take care not to call this
/// function for the same (tx_id, worker_id, plan_id) before the previous call

View File

@ -21,7 +21,7 @@ namespace distributed {
class RemoteUpdatesRpcClients {
public:
explicit RemoteUpdatesRpcClients(distributed::Coordination &coordination)
: worker_clients_(coordination, kRemoteUpdatesRpc) {}
: worker_clients_(coordination) {}
/// Sends an update delta to the given worker.
RemoteUpdateResult RemoteUpdate(int worker_id,

View File

@ -13,8 +13,6 @@
namespace distributed {
const std::string kRemoteUpdatesRpc = "RemoteUpdatesRpc";
/// The result of sending or applying a deferred update to a worker.
enum class RemoteUpdateResult {
DONE,

View File

@ -143,8 +143,8 @@ class RemoteUpdatesRpcServer {
public:
RemoteUpdatesRpcServer(database::GraphDb &db, tx::Engine &engine,
communication::rpc::System &system)
: db_(db), engine_(engine), server_(system, kRemoteUpdatesRpc) {
communication::rpc::Server &server)
: db_(db), engine_(engine), server_(server) {
server_.Register<RemoteUpdateRpc>([this](const RemoteUpdateReq &req) {
using DeltaType = database::StateDelta::Type;
auto &delta = req.member;
@ -229,7 +229,7 @@ class RemoteUpdatesRpcServer {
private:
database::GraphDb &db_;
tx::Engine &engine_;
communication::rpc::Server server_;
communication::rpc::Server &server_;
tx::TxEndListener tx_end_listener_{engine_,
[this](tx::transaction_id_t tx_id) {
vertex_updates_.access().remove(tx_id);

View File

@ -14,9 +14,7 @@ namespace distributed {
* Thread safe. */
class RpcWorkerClients {
public:
RpcWorkerClients(Coordination &coordination,
const std::string &rpc_client_name)
: coordination_(coordination), rpc_client_name_(rpc_client_name) {}
RpcWorkerClients(Coordination &coordination) : coordination_(coordination) {}
RpcWorkerClients(const RpcWorkerClients &) = delete;
RpcWorkerClients(RpcWorkerClients &&) = delete;
@ -29,8 +27,7 @@ class RpcWorkerClients {
if (found != client_pools_.end()) return found->second;
return client_pools_
.emplace(std::piecewise_construct, std::forward_as_tuple(worker_id),
std::forward_as_tuple(coordination_.GetEndpoint(worker_id),
rpc_client_name_))
std::forward_as_tuple(coordination_.GetEndpoint(worker_id)))
.first->second;
}
@ -67,7 +64,6 @@ class RpcWorkerClients {
private:
// TODO make Coordination const, it's member GetEndpoint must be const too.
Coordination &coordination_;
const std::string rpc_client_name_;
std::unordered_map<int, communication::rpc::ClientPool> client_pools_;
std::mutex lock_;
};

View File

@ -38,7 +38,7 @@ DEFINE_VALIDATED_int32(port, 7687, "Communication port on which to listen.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
DEFINE_VALIDATED_int32(num_workers,
std::max(std::thread::hardware_concurrency(), 1U),
"Number of workers", FLAG_IN_RANGE(1, INT32_MAX));
"Number of workers (Bolt)", FLAG_IN_RANGE(1, INT32_MAX));
DEFINE_string(log_file, "", "Path to where the log should be stored.");
DEFINE_HIDDEN_string(
log_link_basename, "",

View File

@ -43,7 +43,7 @@ void StatsDispatchMain(const io::network::Endpoint &endpoint) {
LOG(INFO) << "Stats dispatcher thread started";
communication::rpc::Client client(endpoint, kStatsServiceName);
communication::rpc::Client client(endpoint);
BatchStatsReq batch_request;
batch_request.requests.reserve(MAX_BATCH_SIZE);

View File

@ -11,8 +11,6 @@
namespace stats {
static const std::string kStatsServiceName = "statsd-service";
/**
* Start sending metrics to StatsD server.
*

View File

@ -31,11 +31,11 @@ ID_VALUE_RPC_CALLS(Property)
template <typename TId>
MasterConcurrentIdMapper<TId>::MasterConcurrentIdMapper(
communication::rpc::System &system)
communication::rpc::Server &server)
// We have to make sure our rpc server name is unique with regards to type.
// Otherwise we will try to reuse the same rpc server name for different
// types (Label/EdgeType/Property)
: rpc_server_(system, impl::RpcServerNameFromType<TId>()) {
: rpc_server_(server) {
RegisterRpc(*this, rpc_server_);
}

View File

@ -12,9 +12,9 @@ namespace storage {
template <typename TId>
class MasterConcurrentIdMapper : public SingleNodeConcurrentIdMapper<TId> {
public:
explicit MasterConcurrentIdMapper(communication::rpc::System &system);
explicit MasterConcurrentIdMapper(communication::rpc::Server &server);
private:
communication::rpc::Server rpc_server_;
communication::rpc::Server &rpc_server_;
};
} // namespace storage

View File

@ -10,18 +10,6 @@
namespace storage {
const std::string kConcurrentIdMapperRpc = "ConcurrentIdMapper";
namespace impl {
/// Returns rpc server name by template type
template <typename TType>
std::string RpcServerNameFromType() {
return kConcurrentIdMapperRpc + "_" + typeid(TType).name();
}
}; // namespace impl
#define ID_VALUE_RPC(type) \
RPC_SINGLE_MEMBER_MESSAGE(type##IdReq, std::string); \
RPC_SINGLE_MEMBER_MESSAGE(type##IdRes, storage::type); \

View File

@ -32,7 +32,7 @@ ID_VALUE_RPC_CALLS(Property)
template <typename TId>
WorkerConcurrentIdMapper<TId>::WorkerConcurrentIdMapper(
const io::network::Endpoint &master_endpoint)
: rpc_client_pool_(master_endpoint, impl::RpcServerNameFromType<TId>()) {}
: rpc_client_pool_(master_endpoint) {}
template <typename TId>
TId WorkerConcurrentIdMapper<TId>::value_to_id(const std::string &value) {

View File

@ -9,9 +9,9 @@
namespace tx {
MasterEngine::MasterEngine(communication::rpc::System &system,
MasterEngine::MasterEngine(communication::rpc::Server &server,
durability::WriteAheadLog *wal)
: SingleNodeEngine(wal), rpc_server_(system, kTransactionEngineRpc) {
: SingleNodeEngine(wal), rpc_server_(server) {
rpc_server_.Register<BeginRpc>([this](const BeginReq &) {
auto tx = Begin();
return std::make_unique<BeginRes>(TxAndSnapshot{tx->id_, tx->snapshot()});

View File

@ -13,10 +13,10 @@ class MasterEngine : public SingleNodeEngine {
* @param wal - Optional. If present, the Engine will write tx
* Begin/Commit/Abort atomically (while under lock).
*/
MasterEngine(communication::rpc::System &system,
MasterEngine(communication::rpc::Server &server,
durability::WriteAheadLog *wal = nullptr);
private:
communication::rpc::Server rpc_server_;
communication::rpc::Server &rpc_server_;
};
} // namespace tx

View File

@ -7,8 +7,6 @@
namespace tx {
const std::string kTransactionEngineRpc = "transaction_engine_rpc";
RPC_NO_MEMBER_MESSAGE(BeginReq)
struct TxAndSnapshot {
transaction_id_t tx_id;

View File

@ -9,7 +9,7 @@
namespace tx {
WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint)
: rpc_client_pool_(endpoint, kTransactionEngineRpc) {
: rpc_client_pool_(endpoint) {
cache_clearing_scheduler_.Run(kCacheReleasePeriod, [this]() {
// Use the GC snapshot as it always has at least one member.
auto res = rpc_client_pool_.Call<GcSnapshotRpc>();

View File

@ -29,10 +29,8 @@ int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]);
// Initialize client.
Client client(
io::network::Endpoint(utils::ResolveHostname(FLAGS_server_interface),
FLAGS_server_port),
"main");
Client client(io::network::Endpoint(
utils::ResolveHostname(FLAGS_server_interface), FLAGS_server_port));
// Try to send 100 values to server.
// If requests timeout, try to resend it.

View File

@ -29,9 +29,7 @@ int main(int argc, char **argv) {
// Unhandled exception handler init.
std::set_terminate(&terminate_handler);
System server_system(
io::network::Endpoint(FLAGS_interface, stoul(FLAGS_port)));
Server server(server_system, "main");
Server server(io::network::Endpoint(FLAGS_interface, stoul(FLAGS_port)));
std::ofstream log(FLAGS_log, std::ios_base::app);
// Handler for regular termination signals.

View File

@ -42,8 +42,8 @@ int main(int argc, char *argv[]) {
{"b", Endpoint("127.0.0.1", 12346)},
{"c", Endpoint("127.0.0.1", 12347)}};
communication::rpc::System my_system(directory[FLAGS_member_id]);
RpcNetwork<DummyState> network(my_system, directory);
communication::rpc::Server server(directory[FLAGS_member_id]);
RpcNetwork<DummyState> network(server, directory);
raft::SimpleFileStorage<DummyState> storage(FLAGS_log_dir);
raft::RaftConfig config{{"a", "b", "c"}, 150ms, 300ms, 70ms, 30ms};

View File

@ -12,15 +12,15 @@ class DistributedConcurrentIdMapperTest : public ::testing::Test {
const std::string kLocal{"127.0.0.1"};
protected:
communication::rpc::System master_system_{{kLocal, 0}};
communication::rpc::Server master_server_{{kLocal, 0}};
std::experimental::optional<storage::MasterConcurrentIdMapper<TId>>
master_mapper_;
std::experimental::optional<storage::WorkerConcurrentIdMapper<TId>>
worker_mapper_;
void SetUp() override {
master_mapper_.emplace(master_system_);
worker_mapper_.emplace(master_system_.endpoint());
master_mapper_.emplace(master_server_);
worker_mapper_.emplace(master_server_.endpoint());
}
void TearDown() override {
worker_mapper_ = std::experimental::nullopt;

View File

@ -6,11 +6,11 @@
const std::string kLocal = "127.0.0.1";
TEST(CountersDistributed, All) {
communication::rpc::System master_sys({kLocal, 0});
database::MasterCounters master(master_sys);
communication::rpc::Server master_server({kLocal, 0});
database::MasterCounters master(master_server);
database::WorkerCounters w1(master_sys.endpoint());
database::WorkerCounters w2(master_sys.endpoint());
database::WorkerCounters w1(master_server.endpoint());
database::WorkerCounters w2(master_server.endpoint());
EXPECT_EQ(w1.Get("a"), 0);
EXPECT_EQ(w1.Get("a"), 1);

View File

@ -12,7 +12,7 @@
#include "distributed/coordination_worker.hpp"
#include "io/network/endpoint.hpp"
using communication::rpc::System;
using communication::rpc::Server;
using namespace distributed;
using namespace std::literals::chrono_literals;
@ -23,34 +23,34 @@ class WorkerInThread {
public:
WorkerInThread(io::network::Endpoint master_endpoint, int desired_id = -1) {
worker_thread_ = std::thread([this, master_endpoint, desired_id] {
system_.emplace(Endpoint(kLocal, 0));
coord_.emplace(*system_, master_endpoint);
server_.emplace(Endpoint(kLocal, 0));
coord_.emplace(*server_, master_endpoint);
worker_id_ = coord_->RegisterWorker(desired_id);
coord_->WaitForShutdown();
});
}
int worker_id() const { return worker_id_; }
auto endpoint() const { return system_->endpoint(); }
auto endpoint() const { return server_->endpoint(); }
auto worker_endpoint(int worker_id) { return coord_->GetEndpoint(worker_id); }
void join() { worker_thread_.join(); }
private:
std::thread worker_thread_;
std::experimental::optional<System> system_;
std::experimental::optional<Server> server_;
std::experimental::optional<WorkerCoordination> coord_;
std::atomic<int> worker_id_{0};
};
TEST(Distributed, Coordination) {
System master_system({kLocal, 0});
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerInThread>> workers;
{
MasterCoordination master_coord(master_system);
MasterCoordination master_coord(master_server);
for (int i = 0; i < kWorkerCount; ++i)
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint()));
std::make_unique<WorkerInThread>(master_server.endpoint()));
// Wait till all the workers are safely initialized.
std::this_thread::sleep_for(300ms);
@ -72,16 +72,16 @@ TEST(Distributed, Coordination) {
}
TEST(Distributed, DesiredAndUniqueId) {
System master_system({kLocal, 0});
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerInThread>> workers;
{
MasterCoordination master_coord(master_system);
MasterCoordination master_coord(master_server);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint(), 42));
std::make_unique<WorkerInThread>(master_server.endpoint(), 42));
std::this_thread::sleep_for(200ms);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint(), 42));
std::make_unique<WorkerInThread>(master_server.endpoint(), 42));
std::this_thread::sleep_for(200ms);
EXPECT_EQ(workers[0]->worker_id(), 42);
@ -92,16 +92,16 @@ TEST(Distributed, DesiredAndUniqueId) {
}
TEST(Distributed, CoordinationWorkersId) {
System master_system({kLocal, 0});
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerInThread>> workers;
{
MasterCoordination master_coord(master_system);
MasterCoordination master_coord(master_server);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint(), 42));
std::make_unique<WorkerInThread>(master_server.endpoint(), 42));
std::this_thread::sleep_for(200ms);
workers.emplace_back(
std::make_unique<WorkerInThread>(master_system.endpoint(), 42));
std::make_unique<WorkerInThread>(master_server.endpoint(), 42));
std::this_thread::sleep_for(200ms);
std::vector<int> ids;

View File

@ -73,28 +73,26 @@ BOOST_CLASS_EXPORT(EchoMessage);
using Echo = RequestResponse<EchoMessage, EchoMessage>;
TEST(Rpc, Call) {
System server_system({"127.0.0.1", 0});
Server server(server_system, "main");
Server server({"127.0.0.1", 0});
server.Register<Sum>([](const SumReq &request) {
return std::make_unique<SumRes>(request.x + request.y);
});
std::this_thread::sleep_for(100ms);
Client client(server_system.endpoint(), "main");
Client client(server.endpoint());
auto sum = client.Call<Sum>(10, 20);
EXPECT_EQ(sum->sum, 30);
}
TEST(Rpc, Abort) {
System server_system({"127.0.0.1", 0});
Server server(server_system, "main");
Server server({"127.0.0.1", 0});
server.Register<Sum>([](const SumReq &request) {
std::this_thread::sleep_for(500ms);
return std::make_unique<SumRes>(request.x + request.y);
});
std::this_thread::sleep_for(100ms);
Client client(server_system.endpoint(), "main");
Client client(server.endpoint());
std::thread thread([&client]() {
std::this_thread::sleep_for(100ms);
@ -111,8 +109,7 @@ TEST(Rpc, Abort) {
}
TEST(Rpc, ClientPool) {
System server_system({"127.0.0.1", 0});
Server server(server_system, "main", 4);
Server server({"127.0.0.1", 0});
server.Register<Sum>([](const SumReq &request) {
std::this_thread::sleep_for(100ms);
return std::make_unique<SumRes>(request.x + request.y);
@ -120,7 +117,7 @@ TEST(Rpc, ClientPool) {
std::this_thread::sleep_for(100ms);
Client client(server_system.endpoint(), "main");
Client client(server.endpoint());
/* these calls should take more than 400ms because we're using a regular
* client */
@ -142,7 +139,7 @@ TEST(Rpc, ClientPool) {
EXPECT_GE(t1.Elapsed(), 400ms);
ClientPool pool(server_system.endpoint(), "main");
ClientPool pool(server.endpoint());
/* these calls shouldn't take much more that 100ms because they execute in
* parallel */
@ -163,8 +160,7 @@ TEST(Rpc, ClientPool) {
}
TEST(Rpc, LargeMessage) {
System server_system({"127.0.0.1", 0});
Server server(server_system, "main");
Server server({"127.0.0.1", 0});
server.Register<Echo>([](const EchoMessage &request) {
return std::make_unique<EchoMessage>(request.data);
});
@ -172,7 +168,7 @@ TEST(Rpc, LargeMessage) {
std::string testdata(100000, 'a');
Client client(server_system.endpoint(), "main");
Client client(server.endpoint());
auto echo = client.Call<Echo>(testdata);
EXPECT_EQ(echo->data, testdata);
}

View File

@ -31,21 +31,17 @@ class RpcWorkerClientsTest : public ::testing::Test {
protected:
const io::network::Endpoint kLocalHost{"127.0.0.1", 0};
const int kWorkerCount = 2;
const std::string kRpcName = "test_rpc";
void SetUp() override {
for (int i = 1; i <= kWorkerCount; ++i) {
workers_system_.emplace_back(
std::make_unique<communication::rpc::System>(kLocalHost));
workers_server_.emplace_back(
std::make_unique<communication::rpc::Server>(kLocalHost));
workers_coord_.emplace_back(
std::make_unique<distributed::WorkerCoordination>(
*workers_system_.back(), master_system_.endpoint()));
*workers_server_.back(), master_server_.endpoint()));
workers_coord_.back()->RegisterWorker(i);
workers_rpc_server_.emplace_back(
std::make_unique<communication::rpc::Server>(*workers_system_.back(),
kRpcName));
workers_rpc_server_.back()->Register<distributed::IncrementCounterRpc>(
workers_server_.back()->Register<distributed::IncrementCounterRpc>(
[this, i](const distributed::IncrementCounterReq &) {
workers_cnt_[i]++;
return std::make_unique<distributed::IncrementCounterRes>();
@ -65,16 +61,15 @@ class RpcWorkerClientsTest : public ::testing::Test {
for (auto &worker : wait_on_shutdown) worker.join();
}
std::vector<std::unique_ptr<communication::rpc::System>> workers_system_;
std::vector<std::unique_ptr<communication::rpc::Server>> workers_server_;
std::vector<std::unique_ptr<distributed::WorkerCoordination>> workers_coord_;
std::vector<std::unique_ptr<communication::rpc::Server>> workers_rpc_server_;
std::unordered_map<int, int> workers_cnt_;
communication::rpc::System master_system_{kLocalHost};
communication::rpc::Server master_server_{kLocalHost};
std::experimental::optional<distributed::MasterCoordination> master_coord_{
master_system_};
master_server_};
distributed::RpcWorkerClients rpc_workers_{*master_coord_, kRpcName};
distributed::RpcWorkerClients rpc_workers_{*master_coord_};
};
TEST_F(RpcWorkerClientsTest, GetWorkerIds) {

View File

@ -19,10 +19,10 @@ class WorkerEngineTest : public testing::Test {
protected:
const std::string local{"127.0.0.1"};
System master_system_{{local, 0}};
MasterEngine master_{master_system_};
Server master_server_{{local, 0}};
MasterEngine master_{master_server_};
WorkerEngine worker_{master_system_.endpoint()};
WorkerEngine worker_{master_server_.endpoint()};
};
TEST_F(WorkerEngineTest, BeginOnWorker) {

View File

@ -27,8 +27,7 @@ std::string GraphiteFormat(const stats::StatsReq &req) {
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
communication::rpc::System system({FLAGS_interface, (uint16_t)FLAGS_port});
communication::rpc::Server server(system, stats::kStatsServiceName);
communication::rpc::Server server({FLAGS_interface, (uint16_t)FLAGS_port});
io::network::Socket graphite_socket;