Improve error handling related to the dynamic graph partitioning

Summary:
Fixing bugs in error handling in the following cases (DGP ON):
  * SIGKILL master
  * SIGKILL worker
  * SIGTERM master

Reviewers: mferencevic, msantl, vkasljevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1896
This commit is contained in:
Marko Budiselic 2019-04-11 14:57:54 +02:00
parent 4029026c3b
commit 2f4ca7d3f7
10 changed files with 92 additions and 64 deletions

View File

@ -88,4 +88,6 @@ std::string Coordination::GetWorkerName(const io::network::Endpoint &endpoint) {
return fmt::format("unknown worker ({})", endpoint);
}
bool Coordination::IsClusterAlive() { return cluster_alive_; }
} // namespace distributed

View File

@ -89,6 +89,9 @@ class Coordination {
server_.Register<TRequestResponse>(callback);
}
/// Returns `true` if the cluster is in a consistent state.
bool IsClusterAlive();
protected:
/// Adds a worker to the coordination. This function can be called multiple
/// times to replace an existing worker.
@ -99,6 +102,8 @@ class Coordination {
communication::rpc::Server server_;
std::atomic<bool> cluster_alive_{true};
private:
std::unordered_map<int, io::network::Endpoint> workers_;
mutable std::mutex lock_;

View File

@ -181,6 +181,11 @@ bool MasterCoordination::AwaitShutdown(
}
LOG(INFO) << "Shutdown of all workers is complete.";
// Some RPC servers might still depend on the cluster status to shut down. At
// this point all workers are down which means that the cluster is also not
// alive any more.
cluster_alive_.store(false);
// Shutdown our RPC server.
server_.Shutdown();
server_.AwaitShutdown();
@ -192,8 +197,6 @@ bool MasterCoordination::AwaitShutdown(
void MasterCoordination::Shutdown() { alive_.store(false); }
bool MasterCoordination::IsClusterAlive() { return cluster_alive_; }
void MasterCoordination::IssueHeartbeats() {
std::lock_guard<std::mutex> guard(master_lock_);
auto workers = GetWorkers();

View File

@ -78,9 +78,6 @@ class MasterCoordination final : public Coordination {
/// Hints that the coordination should start shutting down the whole cluster.
void Shutdown();
/// Returns `true` if the cluster is in a consistent state.
bool IsClusterAlive();
private:
/// Sends a heartbeat request to all workers.
void IssueHeartbeats();
@ -103,7 +100,6 @@ class MasterCoordination final : public Coordination {
// Flags used for shutdown.
std::atomic<bool> alive_{true};
std::atomic<bool> cluster_alive_{true};
};
} // namespace distributed

View File

@ -35,7 +35,7 @@ WorkerCoordination::WorkerCoordination(
});
server_.Register<HeartbeatRpc>([&](const auto &req_reader,
auto *res_builder) {
auto *res_builder) {
std::lock_guard<std::mutex> guard(heartbeat_lock_);
last_heartbeat_time_ = std::chrono::steady_clock::now();
if (!scheduler_.IsRunning()) {

View File

@ -48,9 +48,6 @@ class WorkerCoordination final : public Coordination {
/// Hints that the coordination should start shutting down the worker.
void Shutdown();
/// Returns `true` if the cluster is in a consistent state.
bool IsClusterAlive();
private:
// Heartbeat variables
std::mutex heartbeat_lock_;

View File

@ -33,50 +33,61 @@ namespace distributed::dgp {
Partitioner::Partitioner(database::GraphDb *db) : db_(db) {}
std::pair<double, bool> Partitioner::Partition() {
auto dba = db_->Access();
VLOG(21) << "Starting DynamicGraphPartitioner in tx: "
<< dba->transaction().id_;
auto data = FindMigrations(*dba);
auto failed_partitioning_data =
std::make_pair(std::numeric_limits<double>::min(), false);
// Note, in distributed system TxBegin can throw because the server that
// assigns transaction numbers might be unavailable.
try {
VertexMigrator migrator(dba.get());
for (auto &migration : data.migrations) {
migrator.MigrateVertex(migration.first, migration.second);
}
auto apply_futures = db_->updates_clients().UpdateApplyAll(
db_->WorkerId(), dba->transaction().id_);
for (auto &future : apply_futures) {
switch (future.get()) {
case distributed::UpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError(
"Failed to relocate vertex due to SerializationError");
case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw query::RemoveAttachedVertexException();
case distributed::UpdateResult::UPDATE_DELETED_ERROR:
throw query::QueryRuntimeException(
"Failed to apply deferred updates due to RecordDeletedError");
case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
throw utils::LockTimeoutException(
"Failed to apply deferred update due to LockTimeoutException");
case distributed::UpdateResult::DONE:
break;
auto dba = db_->Access();
VLOG(21) << "Starting DynamicGraphPartitioner in tx: "
<< dba->transaction().id_;
try {
auto data = FindMigrations(*dba);
VertexMigrator migrator(dba.get());
for (auto &migration : data.migrations) {
migrator.MigrateVertex(migration.first, migration.second);
}
}
dba->Commit();
VLOG(21) << "Sucesfully migrated " << data.migrations.size()
<< " vertices..";
return std::make_pair(data.score, true);
} catch (const utils::BasicException &e) {
VLOG(21) << "Didn't succeed in relocating; " << e.what();
dba->Abort();
// Returning VertexAccessors after Abort might not be a good idea. + The
// returned migrations are entirely useless because the engine didn't
// succeed to migrate anything.
return std::make_pair(data.score, false);
auto apply_futures = db_->updates_clients().UpdateApplyAll(
db_->WorkerId(), dba->transaction().id_);
for (auto &future : apply_futures) {
switch (future.get()) {
case distributed::UpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError(
"Failed to relocate vertex due to SerializationError");
case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw query::RemoveAttachedVertexException();
case distributed::UpdateResult::UPDATE_DELETED_ERROR:
throw query::QueryRuntimeException(
"Failed to apply deferred updates due to RecordDeletedError");
case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
throw utils::LockTimeoutException(
"Failed to apply deferred update due to LockTimeoutException");
case distributed::UpdateResult::DONE:
break;
}
}
dba->Commit();
VLOG(21) << "Sucesfully migrated " << data.migrations.size()
<< " vertices with score " << data.score << ".";
return std::make_pair(data.score, true);
} catch (const utils::BasicException &e) {
VLOG(21) << "Didn't succeed in relocating; " << e.what();
dba->Abort();
// Returning VertexAccessors after Abort might not be a good idea. + The
// returned migrations are entirely useless because the engine didn't
// succeed to migrate anything.
return failed_partitioning_data;
}
} catch (const communication::rpc::RpcFailedException &e) {
// Transaction start failed because BeginRpc failed. Nothing to cleanup.
// Any other RpcFailedExceptions should be handeled in the inner try block.
VLOG(21) << "Failed to start DGP transaction; " << e.what();
return failed_partitioning_data;
} catch (const std::exception &e) {
LOG(FATAL) << "Unhandled exception during partitioning. " << e.what();
}
}
@ -93,7 +104,8 @@ MigrationsData Partitioner::FindMigrations(database::GraphDbAccessor &dba) {
double average_vertex_count =
total_vertex_count * 1.0 / worker_vertex_count.size();
if (average_vertex_count == 0) return MigrationsData(0);
if (average_vertex_count == 0)
return MigrationsData(std::numeric_limits<double>::min());
double local_graph_score = 0;

View File

@ -64,7 +64,7 @@ class TokenSharingRpcServer {
}
// Try to transfer the token until successful.
while (true) {
while (!shutting_down_) {
try {
coordination_->GetClientPool(next_worker)->Call<TokenTransferRpc>();
break;
@ -95,6 +95,10 @@ class TokenSharingRpcServer {
// TODO (buda): Solve this better in the future since this blocks
// shutting down until spinner steps complete.
while (!token_) {
// Cluster state has to be examined here because if one of the workers
// is down it doesn't make sense to wait for the token because token
// probably won't arrive back.
if (!coordination_->IsClusterAlive()) return;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

View File

@ -198,9 +198,11 @@ def run_test(args):
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
memgraph_binary = os.path.join(PROJECT_DIR, "build", "memgraph")
memgraph_binary = os.path.join(PROJECT_DIR, "build",
"memgraph_distributed")
if not os.path.exists(memgraph_binary):
memgraph_binary = os.path.join(PROJECT_DIR, "build_debug", "memgraph")
memgraph_binary = os.path.join(PROJECT_DIR, "build_debug",
"memgraph_distributed")
parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary)

View File

@ -29,10 +29,12 @@ def wait_for_server(port, delay=0.1):
time.sleep(delay)
def generate_args(memgraph_binary, temporary_dir, worker_id):
def generate_args(memgraph_binary, test_master_flags, temporary_dir,
worker_id):
args = [memgraph_binary]
if worker_id == 0:
args.append("--master")
args.extend(test_master_flags)
else:
args.extend(["--worker", "--worker-id", str(worker_id)])
args.extend(["--master-host", "127.0.0.1", "--master-port", "10000"])
@ -55,9 +57,10 @@ def worker_id_to_name(worker_id):
return "worker {}".format(worker_id)
def execute_test(memgraph_binary, tester_binary, cluster_size, disaster,
on_worker_id, execute_query):
args = {"cluster_size": cluster_size, "disaster": disaster,
def execute_test(memgraph_binary, test_master_flags, tester_binary,
cluster_size, disaster, on_worker_id, execute_query):
args = {"cluster_size": cluster_size,
"test_master_flags": test_master_flags, "disaster": disaster,
"on_worker_id": on_worker_id, "execute_query": execute_query}
print("\033[1;36m~~ Executing test with arguments:",
json.dumps(args, sort_keys=True), "~~\033[0m")
@ -69,7 +72,8 @@ def execute_test(memgraph_binary, tester_binary, cluster_size, disaster,
cleanup()
for worker_id in range(cluster_size):
workers.append(subprocess.Popen(
generate_args(memgraph_binary, tempdir.name, worker_id)))
generate_args(memgraph_binary, test_master_flags,
tempdir.name, worker_id)))
time.sleep(0.2)
assert workers[worker_id].poll() is None, \
"The {} process died prematurely!".format(
@ -148,10 +152,13 @@ if __name__ == "__main__":
for cluster_size in [3, 5]:
for worker_id in [0, 1]:
for disaster in ["terminate", "kill"]:
for execute_query in [False, True]:
execute_test(args.memgraph, args.tester, cluster_size,
disaster, worker_id, execute_query)
for test_master_flags in [["--dynamic-graph-partitioner-enabled"],
[]]:
for disaster in ["terminate", "kill"]:
for execute_query in [False, True]:
execute_test(args.memgraph, test_master_flags,
args.tester, cluster_size, disaster,
worker_id, execute_query)
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
sys.exit(0)