diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp index a0db09c1c..e5ed4e669 100644 --- a/src/distributed/rpc_worker_clients.hpp +++ b/src/distributed/rpc_worker_clients.hpp @@ -1,5 +1,9 @@ #pragma once +#include <functional> +#include <future> +#include <type_traits> + #include "communication/messaging/distributed.hpp" #include "communication/rpc/rpc.hpp" #include "distributed/coordination.hpp" @@ -36,6 +40,28 @@ class RpcWorkerClients { auto GetWorkerIds() { return coordination_.GetWorkerIds(); } + /** + * Promises to execute function on workers rpc clients. + * @Tparam TResult - deduced automatically from method + * @param skip_worker_id - worker which to skip (set to -1 to avoid skipping) + * @param execute - Method which takes an rpc client and returns a result for + * it + * @return list of futures filled with function 'execute' results when applied + * to rpc clients + */ + template <typename TResult> + auto ExecuteOnWorkers( + int skip_worker_id, + std::function<TResult(communication::rpc::Client &)> execute) { + std::vector<std::future<TResult>> futures; + for (auto &client : clients_) { + if (client.first == skip_worker_id) continue; + futures.emplace_back( + std::async(std::launch::async, execute(client.second))); + } + return futures; + } + private: communication::messaging::System &system_; // TODO make Coordination const, it's member GetEndpoint must be const too.