diff --git a/src/database/counters.cpp b/src/database/counters.cpp index 62aa2616f..5ee380ddc 100644 --- a/src/database/counters.cpp +++ b/src/database/counters.cpp @@ -47,14 +47,14 @@ WorkerCounters::WorkerCounters( int64_t WorkerCounters::Get(const std::string &name) { auto response = master_client_pool_.Call(name); - CHECK(response) << "CountersGetRpc - failed to get response from master"; + CHECK(response) << "CountersGetRpc failed"; return response->member; } void WorkerCounters::Set(const std::string &name, int64_t value) { auto response = master_client_pool_.Call(CountersSetReqData{name, value}); - CHECK(response) << "CountersSetRpc - failed to get response from master"; + CHECK(response) << "CountersSetRpc failed"; } } // namespace database diff --git a/src/distributed/coordination_master.cpp b/src/distributed/coordination_master.cpp index 671372f17..8aba091dc 100644 --- a/src/distributed/coordination_master.cpp +++ b/src/distributed/coordination_master.cpp @@ -45,7 +45,7 @@ MasterCoordination::~MasterCoordination() { if (kv.first == 0) continue; communication::rpc::Client client(kv.second); auto result = client.Call(); - CHECK(result) << "Failed to shut down worker: " << kv.first; + CHECK(result) << "StopWorkerRpc failed work worker: " << kv.first; } } diff --git a/src/distributed/coordination_worker.cpp b/src/distributed/coordination_worker.cpp index 9de20047e..e65ed0dc6 100644 --- a/src/distributed/coordination_worker.cpp +++ b/src/distributed/coordination_worker.cpp @@ -16,7 +16,7 @@ WorkerCoordination::WorkerCoordination(communication::rpc::Server &server, int WorkerCoordination::RegisterWorker(int desired_worker_id) { auto result = client_pool_.Call(desired_worker_id, server_.endpoint()); - CHECK(result) << "Failed to RegisterWorker with the master"; + CHECK(result) << "RegisterWorkerRpc failed"; return result->member; } @@ -25,7 +25,7 @@ Endpoint WorkerCoordination::GetEndpoint(int worker_id) { auto found = accessor.find(worker_id); if (found != accessor.end()) return found->second; auto result = client_pool_.Call(worker_id); - CHECK(result) << "Failed to GetEndpoint from the master"; + CHECK(result) << "GetEndpointRpc failed"; accessor.insert(worker_id, result->member); return result->member; } diff --git a/src/distributed/plan_dispatcher.cpp b/src/distributed/plan_dispatcher.cpp index 0f000ea3a..090512303 100644 --- a/src/distributed/plan_dispatcher.cpp +++ b/src/distributed/plan_dispatcher.cpp @@ -12,7 +12,7 @@ void PlanDispatcher::DispatchPlan( symbol_table](communication::rpc::ClientPool &client_pool) { auto result = client_pool.Call(plan_id, plan, symbol_table); - CHECK(result) << "Failed to dispatch plan to worker"; + CHECK(result) << "DistributedPlanRpc failed"; }); for (auto &future : futures) { diff --git a/src/distributed/remote_data_rpc_clients.hpp b/src/distributed/remote_data_rpc_clients.hpp index f6cc24d57..675020119 100644 --- a/src/distributed/remote_data_rpc_clients.hpp +++ b/src/distributed/remote_data_rpc_clients.hpp @@ -24,6 +24,7 @@ class RemoteDataRpcClients { gid::Gid gid) { auto response = clients_.GetClientPool(worker_id).Call( TxGidPair{tx_id, gid}); + CHECK(response) << "RemoteVertexRpc failed"; return std::move(response->name_output_); } @@ -34,6 +35,7 @@ class RemoteDataRpcClients { gid::Gid gid) { auto response = clients_.GetClientPool(worker_id).Call( TxGidPair{tx_id, gid}); + CHECK(response) << "RemoteEdgeRpc failed"; return std::move(response->name_output_); } diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/remote_pull_rpc_clients.hpp index 691625e74..aa7eb99d9 100644 --- a/src/distributed/remote_pull_rpc_clients.hpp +++ b/src/distributed/remote_pull_rpc_clients.hpp @@ -90,7 +90,8 @@ class RemotePullRpcClients { std::vector> NotifyAllTransactionCommandAdvanced( tx::transaction_id_t tx_id) { return clients_.ExecuteOnWorkers(0, [tx_id](auto &client) { - client.template Call(tx_id); + auto res = client.template Call(tx_id); + CHECK(res) << "TransactionCommandAdvanceRpc failed"; }); } diff --git a/src/distributed/remote_updates_rpc_clients.hpp b/src/distributed/remote_updates_rpc_clients.hpp index f7b803698..ba4e71cf1 100644 --- a/src/distributed/remote_updates_rpc_clients.hpp +++ b/src/distributed/remote_updates_rpc_clients.hpp @@ -27,9 +27,10 @@ class RemoteUpdatesRpcClients { /// Sends an update delta to the given worker. RemoteUpdateResult RemoteUpdate(int worker_id, const database::StateDelta &delta) { - return worker_clients_.GetClientPool(worker_id) - .Call(delta) - ->member; + auto res = + worker_clients_.GetClientPool(worker_id).Call(delta); + CHECK(res) << "RemoteUpdateRpc failed on worker: " << worker_id; + return res->member; } /// Creates a vertex on the given worker and returns it's id. @@ -38,14 +39,13 @@ class RemoteUpdatesRpcClients { const std::vector &labels, const std::unordered_map &properties) { - auto result = + auto res = worker_clients_.GetClientPool(worker_id).Call( RemoteCreateVertexReqData{tx_id, labels, properties}); - CHECK(result) << "Failed to remote-create a vertex on worker: " - << worker_id; - CHECK(result->member.result == RemoteUpdateResult::DONE) - << "Vertex creation can not result in an error"; - return result->member.gid; + CHECK(res) << "RemoteCreateVertexRpc failed on worker: " << worker_id; + CHECK(res->member.result == RemoteUpdateResult::DONE) + << "Remote Vertex creation result not RemoteUpdateResult::DONE"; + return res->member.gid; } /// Creates an edge on the given worker and returns it's address. If the `to` @@ -64,7 +64,7 @@ class RemoteUpdatesRpcClients { auto res = worker_clients_.GetClientPool(from_worker) .Call(RemoteCreateEdgeReqData{ from.gid(), to.GlobalAddress(), edge_type, tx_id}); - CHECK(res) << "RemoteCreateEdge RPC failed"; + CHECK(res) << "RemoteCreateEdge RPC failed on worker: " << from_worker; RaiseIfRemoteError(res->member.result); return {res->member.gid, from_worker}; } @@ -78,11 +78,12 @@ class RemoteUpdatesRpcClients { (from.GlobalAddress().worker_id() != to.address().worker_id())) << "RemoteAddInEdge should only be called when `to` is remote and " "`from` is not on the same worker as `to`."; - auto res = worker_clients_.GetClientPool(to.GlobalAddress().worker_id()) - .Call(RemoteAddInEdgeReqData{ - from.GlobalAddress(), edge_address, to.gid(), edge_type, - tx_id}); - CHECK(res) << "RemoteAddInEdge RPC failed"; + auto worker_id = to.GlobalAddress().worker_id(); + auto res = + worker_clients_.GetClientPool(worker_id).Call( + RemoteAddInEdgeReqData{from.GlobalAddress(), edge_address, to.gid(), + edge_type, tx_id}); + CHECK(res) << "RemoteAddInEdge RPC failed on worker: " << worker_id; RaiseIfRemoteError(res->member); } @@ -91,7 +92,7 @@ class RemoteUpdatesRpcClients { auto res = worker_clients_.GetClientPool(worker_id).Call( RemoteRemoveVertexReqData{gid, tx_id, check_empty}); - CHECK(res) << "RemoteRemoveVertex RPC failed"; + CHECK(res) << "RemoteRemoveVertex RPC failed on worker: " << worker_id; RaiseIfRemoteError(res->member); } @@ -107,7 +108,7 @@ class RemoteUpdatesRpcClients { worker_clients_.GetClientPool(worker_id).Call( RemoteRemoveEdgeData{tx_id, edge_gid, vertex_from_id, vertex_to_addr}); - CHECK(res) << "RemoteRemoveEdge RPC failed"; + CHECK(res) << "RemoteRemoveEdge RPC failed on worker: " << worker_id; RaiseIfRemoteError(res->member); } @@ -119,26 +120,19 @@ class RemoteUpdatesRpcClients { auto res = worker_clients_.GetClientPool(worker_id).Call( RemoteRemoveInEdgeData{tx_id, vertex_id, edge_address}); - CHECK(res) << "RemoteRemoveInEdge RPC failed"; + CHECK(res) << "RemoteRemoveInEdge RPC failed on worker: " << worker_id; RaiseIfRemoteError(res->member); } - /// Calls for the worker with the given ID to apply remote updates. Returns - /// the results of that operation. - RemoteUpdateResult RemoteUpdateApply(int worker_id, - tx::transaction_id_t tx_id) { - return worker_clients_.GetClientPool(worker_id) - .Call(tx_id) - ->member; - } - /// Calls for all the workers (except the given one) to apply their updates /// and returns the future results. std::vector> RemoteUpdateApplyAll( int skip_worker_id, tx::transaction_id_t tx_id) { return worker_clients_.ExecuteOnWorkers( skip_worker_id, [tx_id](auto &client) { - return client.template Call(tx_id)->member; + auto res = client.template Call(tx_id); + CHECK(res) << "RemoteUpdateApplyRpc failed"; + return res->member; }); } diff --git a/src/stats/stats.cpp b/src/stats/stats.cpp index 276f32b1c..2768f30d2 100644 --- a/src/stats/stats.cpp +++ b/src/stats/stats.cpp @@ -56,7 +56,7 @@ void StatsDispatchMain(const io::network::Endpoint &endpoint) { size_t sent = 0, total = 0; auto flush_batch = [&] { - if (auto rep = client.Call(batch_request)) { + if (client.Call(batch_request)) { sent += batch_request.requests.size(); } total += batch_request.requests.size(); diff --git a/src/storage/concurrent_id_mapper_worker.cpp b/src/storage/concurrent_id_mapper_worker.cpp index be78c0d7c..85902702c 100644 --- a/src/storage/concurrent_id_mapper_worker.cpp +++ b/src/storage/concurrent_id_mapper_worker.cpp @@ -6,20 +6,20 @@ namespace storage { -#define ID_VALUE_RPC_CALLS(type) \ - template <> \ - type WorkerConcurrentIdMapper::RpcValueToId( \ - const std::string &value) { \ - auto response = master_client_pool_.Call(value); \ - CHECK(response) << ("Failed to obtain " #type " from master"); \ - return response->member; \ - } \ - \ - template <> \ - std::string WorkerConcurrentIdMapper::RpcIdToValue(type id) { \ - auto response = master_client_pool_.Call(id); \ - CHECK(response) << ("Failed to obtain " #type " value from master"); \ - return response->member; \ +#define ID_VALUE_RPC_CALLS(type) \ + template <> \ + type WorkerConcurrentIdMapper::RpcValueToId( \ + const std::string &value) { \ + auto response = master_client_pool_.Call(value); \ + CHECK(response) << (#type "IdRpc failed"); \ + return response->member; \ + } \ + \ + template <> \ + std::string WorkerConcurrentIdMapper::RpcIdToValue(type id) { \ + auto response = master_client_pool_.Call(id); \ + CHECK(response) << ("Id" #type "Rpc failed"); \ + return response->member; \ } using namespace storage; diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 0381910d9..2806b975e 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -18,7 +18,9 @@ WorkerEngine::~WorkerEngine() { } Transaction *WorkerEngine::Begin() { - auto data = master_client_pool_.Call()->member; + auto res = master_client_pool_.Call(); + CHECK(res) << "BeginRpc failed"; + auto &data = res->member; UpdateOldestActive(data.snapshot, data.tx_id); Transaction *tx = new Transaction(data.tx_id, data.snapshot, *this); auto insertion = active_.access().insert(data.tx_id, tx); @@ -28,6 +30,7 @@ Transaction *WorkerEngine::Begin() { command_id_t WorkerEngine::Advance(transaction_id_t tx_id) { auto res = master_client_pool_.Call(tx_id); + CHECK(res) << "AdvanceRpc failed"; auto access = active_.access(); auto found = access.find(tx_id); CHECK(found != access.end()) @@ -37,7 +40,9 @@ command_id_t WorkerEngine::Advance(transaction_id_t tx_id) { } command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) { - command_id_t cmd_id = master_client_pool_.Call(tx_id)->member; + auto res = master_client_pool_.Call(tx_id); + CHECK(res) << "CommandRpc failed"; + auto cmd_id = res->member; // Assume there is no concurrent work being done on this worker in the given // transaction. This assumption is sound because command advancing needs to be @@ -56,11 +61,13 @@ command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) { void WorkerEngine::Commit(const Transaction &t) { auto res = master_client_pool_.Call(t.id_); + CHECK(res) << "CommitRpc failed"; ClearSingleTransaction(t.id_); } void WorkerEngine::Abort(const Transaction &t) { auto res = master_client_pool_.Call(t.id_); + CHECK(res) << "AbortRpc failed"; ClearSingleTransaction(t.id_); } @@ -71,7 +78,9 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { if (!(info.is_aborted() || info.is_committed())) { // @review: this version of Call is just used because Info has no // default constructor. - info = master_client_pool_.Call(tid)->member; + auto res = master_client_pool_.Call(tid); + CHECK(res) << "ClogInfoRpc failed"; + info = res->member; if (!info.is_active()) { if (info.is_committed()) clog_.set_committed(tid); if (info.is_aborted()) clog_.set_aborted(tid); @@ -83,14 +92,17 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { } Snapshot WorkerEngine::GlobalGcSnapshot() { - auto snapshot = std::move(master_client_pool_.Call()->member); + auto res = master_client_pool_.Call(); + CHECK(res) << "GcSnapshotRpc failed"; + auto snapshot = std::move(res->member); UpdateOldestActive(snapshot, local_last_.load()); return snapshot; } Snapshot WorkerEngine::GlobalActiveTransactions() { - auto snapshot = - std::move(master_client_pool_.Call()->member); + auto res = master_client_pool_.Call(); + CHECK(res) << "ActiveTransactionsRpc failed"; + auto snapshot = std::move(res->member); UpdateOldestActive(snapshot, local_last_.load()); return snapshot; } @@ -111,8 +123,9 @@ Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id) { auto found = accessor.find(tx_id); if (found != accessor.end()) return found->second; - auto snapshot = - std::move(master_client_pool_.Call(tx_id)->member); + auto res = master_client_pool_.Call(tx_id); + CHECK(res) << "SnapshotRpc failed"; + auto snapshot = std::move(res->member); UpdateOldestActive(snapshot, local_last_.load()); return RunningTransaction(tx_id, snapshot); } diff --git a/tests/unit/rpc.cpp b/tests/unit/rpc.cpp index f3d449c25..d22bbeee5 100644 --- a/tests/unit/rpc.cpp +++ b/tests/unit/rpc.cpp @@ -81,6 +81,7 @@ TEST(Rpc, Call) { Client client(server.endpoint()); auto sum = client.Call(10, 20); + ASSERT_TRUE(sum); EXPECT_EQ(sum->sum, 30); } @@ -102,7 +103,7 @@ TEST(Rpc, Abort) { utils::Timer timer; auto sum = client.Call(10, 20); - EXPECT_EQ(sum, nullptr); + EXPECT_FALSE(sum); EXPECT_LT(timer.Elapsed(), 200ms); thread.join(); @@ -116,14 +117,13 @@ TEST(Rpc, ClientPool) { }); std::this_thread::sleep_for(100ms); - Client client(server.endpoint()); /* these calls should take more than 400ms because we're using a regular * client */ auto get_sum_client = [&client](int x, int y) { auto sum = client.Call(x, y); - ASSERT_TRUE(sum != nullptr); + ASSERT_TRUE(sum); EXPECT_EQ(sum->sum, x + y); }; @@ -145,7 +145,7 @@ TEST(Rpc, ClientPool) { * parallel */ auto get_sum = [&pool](int x, int y) { auto sum = pool.Call(x, y); - ASSERT_TRUE(sum != nullptr); + ASSERT_TRUE(sum); EXPECT_EQ(sum->sum, x + y); }; @@ -170,5 +170,6 @@ TEST(Rpc, LargeMessage) { Client client(server.endpoint()); auto echo = client.Call(testdata); + ASSERT_TRUE(echo); EXPECT_EQ(echo->data, testdata); } diff --git a/tests/unit/rpc_worker_clients.cpp b/tests/unit/rpc_worker_clients.cpp index b5e0c1163..02a784308 100644 --- a/tests/unit/rpc_worker_clients.cpp +++ b/tests/unit/rpc_worker_clients.cpp @@ -86,8 +86,7 @@ TEST_F(RpcWorkerClientsTest, GetClientPool) { TEST_F(RpcWorkerClientsTest, ExecuteOnWorker) { auto execute = [](auto &client) -> void { - ASSERT_NE(client.template Call(), - nullptr); + ASSERT_TRUE(client.template Call()); }; rpc_workers_.ExecuteOnWorker(1, execute).get(); @@ -98,8 +97,7 @@ TEST_F(RpcWorkerClientsTest, ExecuteOnWorker) { TEST_F(RpcWorkerClientsTest, ExecuteOnWorkers) { auto execute = [](auto &client) -> void { - ASSERT_NE(client.template Call(), - nullptr); + ASSERT_TRUE(client.template Call()); }; // Skip master