Check if connection alive & fix pure virtual bug

Reviewers: mferencevic, msantl, florijan

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1353
This commit is contained in:
Dominik Gleich 2018-04-11 16:05:49 +02:00
parent 20d667f8bf
commit 87dbe26038
11 changed files with 80 additions and 40 deletions

View File

@ -234,13 +234,6 @@ class Worker : public PrivateBase {
return produce_server_;
}
~Worker() {
// The server is stopped explicitly here to disable RPC calls during the
// destruction of this object. This works because this destructor is called
// before the destructors of all objects.
server_.StopProcessingCalls();
}
communication::rpc::Server server_{
config_.worker_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
distributed::WorkerCoordination coordination_{server_,

View File

@ -49,7 +49,7 @@ class StorageGc {
[this] { CollectGarbage(); });
}
~StorageGc() {
virtual ~StorageGc() {
scheduler_.Stop();
edges_.record_deleter_.FreeExpiredObjects(tx::Transaction::MaxId());
@ -141,12 +141,12 @@ class StorageGc {
}
tx::Engine &tx_engine_;
Scheduler scheduler_;
private:
Storage &storage_;
MvccDeleter<Vertex> vertices_;
MvccDeleter<Edge> edges_;
Scheduler scheduler_;
// History of <oldest active transaction, next transaction to be ran> ranges
// that gc operated on at some previous time - used to clear commit log

View File

@ -24,12 +24,19 @@ class StorageGcMaster : public StorageGc {
});
}
~StorageGcMaster() {
// We have to stop scheduler before destroying this class because otherwise
// a task might try to utilize methods in this class which might cause pure
// virtual method called since they are not implemented for the base class.
scheduler_.Stop();
}
void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final {
// Workers are sending information when it's safe to delete every
// transaction older than oldest_active from their perspective i.e. there
// won't exist another transaction in the future with id larger than or
// equal to oldest_active that might trigger a query into a commit log about
// the state of transactions which we are deleting
// the state of transactions which we are deleting.
auto safe_transaction = GetClogSafeTransaction(oldest_active);
if (safe_transaction) {
tx::transaction_id_t min_safe = *safe_transaction;

View File

@ -7,6 +7,13 @@ class StorageGcSingleNode : public StorageGc {
public:
using StorageGc::StorageGc;
~StorageGcSingleNode() {
// We have to stop scheduler before destroying this class because otherwise
// a task might try to utilize methods in this class which might cause pure
// virtual method called since they are not implemented for the base class.
scheduler_.Stop();
}
void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final {
auto safe_to_delete = GetClogSafeTransaction(oldest_active);
if (safe_to_delete) tx_engine_.GarbageCollectCommitLog(*safe_to_delete);

View File

@ -17,6 +17,13 @@ class StorageGcWorker : public StorageGc {
master_client_pool_(master_client_pool),
worker_id_(worker_id) {}
~StorageGcWorker() {
// We have to stop scheduler before destroying this class because otherwise
// a task might try to utilize methods in this class which might cause pure
// virtual method called since they are not implemented for the base class.
scheduler_.Stop();
}
void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final {
auto safe_to_delete = GetClogSafeTransaction(oldest_active);
if (safe_to_delete) {

View File

@ -6,6 +6,7 @@
#include "communication/rpc/client.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_rpc_messages.hpp"
#include "utils/network.hpp"
namespace distributed {
@ -46,8 +47,13 @@ MasterCoordination::~MasterCoordination() {
CHECK(result) << "StopWorkerRpc failed for worker: " << kv.first;
}
// Make sure all StopWorkerRpc request/response are exchanged.
std::this_thread::sleep_for(2s);
// Make sure all workers have died.
for (const auto &kv : workers) {
// Skip master (self).
if (kv.first == 0) continue;
while (utils::CanEstablishConnection(kv.second))
std::this_thread::sleep_for(0.5s);
}
}
} // namespace distributed

View File

@ -37,12 +37,7 @@ void WorkerCoordination::WaitForShutdown() {
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk, [&shutdown] { return shutdown; });
// Sleep to allow the server to return the StopWorker response. This is
// necessary because Shutdown will most likely be called after this function.
// TODO (review): Should we call server_.Shutdown() here? Not the usual
// convention, but maybe better...
std::this_thread::sleep_for(100ms);
};
}
Endpoint WorkerCoordination::GetEndpoint(int worker_id) {
std::lock_guard<std::mutex> guard(lock_);

View File

@ -10,6 +10,8 @@
#include "glog/logging.h"
#include "io/network/socket.hpp"
namespace utils {
/// Resolves hostname to ip, if already an ip, just returns it
@ -51,4 +53,9 @@ std::experimental::optional<std::string> GetHostname() {
return std::string(hostname);
}
bool CanEstablishConnection(const io::network::Endpoint &endpoint) {
io::network::Socket client;
return client.Connect(endpoint);
}
}; // namespace utils

View File

@ -3,6 +3,8 @@
#include <experimental/optional>
#include <string>
#include "io/network/endpoint.hpp"
namespace utils {
/// Resolves hostname to ip, if already an ip, just returns it
@ -11,4 +13,6 @@ std::string ResolveHostname(std::string hostname);
/// Gets hostname
std::experimental::optional<std::string> GetHostname();
// Try to establish a connection to a remote host
bool CanEstablishConnection(const io::network::Endpoint &endpoint);
}; // namespace utils

View File

@ -17,8 +17,8 @@
#include "distributed/rpc_worker_clients.hpp"
#include "io/network/endpoint.hpp"
using communication::rpc::Server;
using communication::rpc::ClientPool;
using communication::rpc::Server;
using namespace distributed;
using namespace std::literals::chrono_literals;
@ -26,40 +26,44 @@ const int kWorkerCount = 5;
const std::string kLocal = "127.0.0.1";
class WorkerCoordinationInThread {
struct Worker {
Worker(Endpoint master_endpoint) : master_endpoint(master_endpoint) {}
Endpoint master_endpoint;
Server server{{kLocal, 0}};
WorkerCoordination coord{server, master_endpoint};
ClientPool client_pool{master_endpoint};
ClusterDiscoveryWorker discovery{server, coord, client_pool};
std::atomic<int> worker_id_{0};
};
public:
WorkerCoordinationInThread(io::network::Endpoint master_endpoint,
int desired_id) {
int desired_id = -1) {
std::atomic<bool> init_done{false};
worker_thread_ =
std::thread([this, master_endpoint, desired_id, &init_done] {
server_.emplace(Endpoint(kLocal, 0));
coord_.emplace(*server_, master_endpoint);
client_pool_.emplace(master_endpoint);
discovery_.emplace(*server_, *coord_, *client_pool_);
// Try and register the worker with the desired id. If another worker
// is already using the desired id it will exit here.
discovery_->RegisterWorker(desired_id);
worker_id_ = desired_id;
worker.emplace(master_endpoint);
worker->discovery.RegisterWorker(desired_id);
worker->worker_id_ = desired_id;
init_done = true;
coord_->WaitForShutdown();
worker->coord.WaitForShutdown();
worker = std::experimental::nullopt;
});
while (!init_done) std::this_thread::sleep_for(10ms);
}
int worker_id() const { return worker_id_; }
auto endpoint() const { return server_->endpoint(); }
auto worker_endpoint(int worker_id) { return coord_->GetEndpoint(worker_id); }
auto worker_ids() { return coord_->GetWorkerIds(); }
int worker_id() const { return worker->worker_id_; }
auto endpoint() const { return worker->server.endpoint(); }
auto worker_endpoint(int worker_id) {
return worker->coord.GetEndpoint(worker_id);
}
auto worker_ids() { return worker->coord.GetWorkerIds(); }
void join() { worker_thread_.join(); }
private:
std::thread worker_thread_;
std::experimental::optional<Server> server_;
std::experimental::optional<WorkerCoordination> coord_;
std::experimental::optional<ClientPool> client_pool_;
std::experimental::optional<ClusterDiscoveryWorker> discovery_;
std::atomic<int> worker_id_{0};
std::experimental::optional<Worker> worker;
};
TEST(Distributed, Coordination) {
@ -167,3 +171,9 @@ TEST(Distributed, ClusterDiscovery) {
for (auto &worker : workers) worker->join();
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
return RUN_ALL_TESTS();
}

View File

@ -57,8 +57,12 @@ class RpcWorkerClientsTest : public ::testing::Test {
void TearDown() override {
std::vector<std::thread> wait_on_shutdown;
for (auto &worker : workers_coord_)
wait_on_shutdown.emplace_back([&worker]() { worker->WaitForShutdown(); });
for (int i = 0; i < workers_coord_.size(); ++i) {
wait_on_shutdown.emplace_back([i, this]() {
workers_coord_[i]->WaitForShutdown();
workers_server_[i] = nullptr;
});
}
std::this_thread::sleep_for(300ms);