Rpc client locking

Summary:
Rpc client wasn't thread safe and required a lock before each rpc call.
The locking functionality is now incorporated in Rpc client.

Reviewers: mislav.bradac

Reviewed By: mislav.bradac

Differential Revision: https://phabricator.memgraph.io/D1056
This commit is contained in:
Dominik Gleich 2017-12-13 16:11:52 +01:00
parent 581404a7b6
commit a8488ac497
5 changed files with 10 additions and 13 deletions

View File

@ -128,6 +128,6 @@ void Server::Shutdown() {
alive_ = false; alive_ = false;
stream_->Shutdown(); stream_->Shutdown();
} }
} } // namespace communication::rpc
CEREAL_REGISTER_TYPE(communication::rpc::Request); CEREAL_REGISTER_TYPE(communication::rpc::Request);
CEREAL_REGISTER_TYPE(communication::rpc::Response); CEREAL_REGISTER_TYPE(communication::rpc::Response);

View File

@ -12,7 +12,7 @@ struct RequestResponse {
using Response = TResponse; using Response = TResponse;
}; };
// Client is not thread safe. // Client is thread safe.
class Client { class Client {
public: public:
Client(messaging::System &system, const std::string &address, uint16_t port, Client(messaging::System &system, const std::string &address, uint16_t port,
@ -30,6 +30,7 @@ class Client {
"TRequestResponse::Request must be derived from Message"); "TRequestResponse::Request must be derived from Message");
static_assert(std::is_base_of<messaging::Message, Res>::value, static_assert(std::is_base_of<messaging::Message, Res>::value,
"TRequestResponse::Response must be derived from Message"); "TRequestResponse::Response must be derived from Message");
std::lock_guard<std::mutex> lock(lock_);
auto response = auto response =
Call(timeout, std::unique_ptr<messaging::Message>( Call(timeout, std::unique_ptr<messaging::Message>(
std::make_unique<Req>(std::forward<Args>(args)...))); std::make_unique<Req>(std::forward<Args>(args)...)));
@ -50,6 +51,7 @@ class Client {
messaging::System &system_; messaging::System &system_;
messaging::Writer writer_; messaging::Writer writer_;
std::shared_ptr<messaging::EventStream> stream_; std::shared_ptr<messaging::EventStream> stream_;
std::mutex lock_;
}; };
class Server { class Server {
@ -68,8 +70,9 @@ class Server {
typename TRequestResponse::Response>::value, typename TRequestResponse::Response>::value,
"TRequestResponse::Response must be derived from Message"); "TRequestResponse::Response must be derived from Message");
auto got = callbacks_.emplace( auto got = callbacks_.emplace(
typeid(typename TRequestResponse::Request), typeid(typename TRequestResponse::Request), [callback = callback](
[callback = callback](const messaging::Message &base_message) { const messaging::Message
&base_message) {
const auto &message = const auto &message =
dynamic_cast<const typename TRequestResponse::Request &>( dynamic_cast<const typename TRequestResponse::Request &>(
base_message); base_message);
@ -90,4 +93,4 @@ class Server {
callbacks_; callbacks_;
std::atomic<bool> alive_{true}; std::atomic<bool> alive_{true};
}; };
} } // namespace communication::rpc

View File

@ -15,7 +15,6 @@ WorkerEngine::WorkerEngine(communication::messaging::System &system,
: rpc_client_(system, tx_server_host, tx_server_port, "tx_engine") {} : rpc_client_(system, tx_server_host, tx_server_port, "tx_engine") {}
Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) { Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) {
std::lock_guard<std::mutex> guard(rpc_client_lock_);
auto accessor = active_.access(); auto accessor = active_.access();
auto found = accessor.find(tx_id); auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second; if (found != accessor.end()) return found->second;
@ -30,7 +29,6 @@ Transaction *WorkerEngine::LocalBegin(transaction_id_t tx_id) {
} }
CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
std::lock_guard<std::mutex> guard(rpc_client_lock_);
auto info = clog_.fetch_info(tid); auto info = clog_.fetch_info(tid);
// If we don't know the transaction to be commited nor aborted, ask the // If we don't know the transaction to be commited nor aborted, ask the
// master about it and update the local commit log. // master about it and update the local commit log.
@ -50,18 +48,15 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
} }
Snapshot WorkerEngine::GlobalGcSnapshot() { Snapshot WorkerEngine::GlobalGcSnapshot() {
std::lock_guard<std::mutex> guard(rpc_client_lock_);
return std::move(rpc_client_.Call<GcSnapshotRpc>(kRpcTimeout)->member); return std::move(rpc_client_.Call<GcSnapshotRpc>(kRpcTimeout)->member);
} }
Snapshot WorkerEngine::GlobalActiveTransactions() { Snapshot WorkerEngine::GlobalActiveTransactions() {
std::lock_guard<std::mutex> guard(rpc_client_lock_);
return std::move( return std::move(
rpc_client_.Call<ActiveTransactionsRpc>(kRpcTimeout)->member); rpc_client_.Call<ActiveTransactionsRpc>(kRpcTimeout)->member);
} }
bool WorkerEngine::GlobalIsActive(transaction_id_t tid) const { bool WorkerEngine::GlobalIsActive(transaction_id_t tid) const {
std::lock_guard<std::mutex> guard(rpc_client_lock_);
return rpc_client_.Call<IsActiveRpc>(kRpcTimeout, tid)->member; return rpc_client_.Call<IsActiveRpc>(kRpcTimeout, tid)->member;
} }

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <mutex>
#include <atomic> #include <atomic>
#include <mutex>
#include "communication/messaging/distributed.hpp" #include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp" #include "communication/rpc/rpc.hpp"
@ -36,6 +36,5 @@ class WorkerEngine : public Engine {
// Communication to the transactional master. // Communication to the transactional master.
mutable communication::rpc::Client rpc_client_; mutable communication::rpc::Client rpc_client_;
mutable std::mutex rpc_client_lock_;
}; };
} // namespace tx } // namespace tx

View File

@ -7,8 +7,8 @@
#include "communication/rpc/rpc.hpp" #include "communication/rpc/rpc.hpp"
#include "gtest/gtest.h" #include "gtest/gtest.h"
using communication::messaging::System;
using communication::messaging::Message; using communication::messaging::Message;
using communication::messaging::System;
using namespace communication::rpc; using namespace communication::rpc;
using namespace std::literals::chrono_literals; using namespace std::literals::chrono_literals;