Rename replica_check_freq to instance_check_freq

This commit is contained in:
Andi Skrgat 2024-01-18 08:28:41 +01:00
parent 38ade99652
commit 8884a0ea78
2 changed files with 17 additions and 22 deletions

View File

@ -40,7 +40,7 @@ CoordinatorClient::~CoordinatorClient() {
});
const auto endpoint = rpc_client_.Endpoint();
// Logging can throw
spdlog::trace("Closing replication client on {}:{}", endpoint.address, endpoint.port);
spdlog::trace("Closing coordinator client on {}:{}", endpoint.address, endpoint.port);
}
void CoordinatorClient::StartFrequentCheck() {
@ -74,6 +74,7 @@ auto CoordinatorClient::InstanceName() const -> std::string_view { return config
auto CoordinatorClient::Endpoint() const -> io::network::Endpoint const & { return rpc_client_.Endpoint(); }
auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; }
////// AF design choice
auto CoordinatorClient::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo const & {
MG_ASSERT(config_.replication_client_info.has_value(), "No ReplicationClientInfo for MAIN instance!");
return *config_.replication_client_info;
@ -94,15 +95,13 @@ auto CoordinatorClient::GetLastTimeResponse() -> std::chrono::system_clock::time
auto CoordinatorClient::SendPromoteReplicaToMainRpc(
std::vector<CoordinatorClientConfig::ReplicationClientInfo> replication_clients_info) const -> bool {
try {
{
auto stream{rpc_client_.Stream<PromoteReplicaToMainRpc>(std::move(replication_clients_info))};
if (!stream.AwaitResponse().success) {
spdlog::error("Failed to perform failover!");
return false;
}
spdlog::info("Sent failover RPC from coordinator to new main!");
return true;
auto stream{rpc_client_.Stream<PromoteReplicaToMainRpc>(std::move(replication_clients_info))};
if (!stream.AwaitResponse().success) {
spdlog::error("Failed to perform failover!");
return false;
}
spdlog::info("Sent failover RPC from coordinator to new main!");
return true;
} catch (const rpc::RpcFailedException &) {
spdlog::error("Failed to send failover RPC from coordinator to new main!");
}

View File

@ -1073,21 +1073,17 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param
if (!FLAGS_coordinator) {
throw QueryRuntimeException("Only coordinator can register coordinator server!");
}
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
// the argument to Callback.
EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters};
auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context};
auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator);
auto replication_socket_address_tv = coordinator_query->socket_address_->Accept(evaluator);
callback.fn = [handler = CoordQueryHandler{dbms_handler}, coordinator_socket_address_tv,
replication_socket_address_tv, main_check_frequency = config.replication_replica_check_frequency,
instance_name = coordinator_query->instance_name_,
sync_mode = coordinator_query->sync_mode_]() mutable {
handler.RegisterReplicaCoordinatorServer(std::string(replication_socket_address_tv.ValueString()),
std::string(coordinator_socket_address_tv.ValueString()),
main_check_frequency, instance_name, sync_mode);
return std::vector<std::vector<TypedValue>>();
};
callback.fn =
[handler = CoordQueryHandler{dbms_handler}, coordinator_socket_address_tv, replication_socket_address_tv,
replica_check_frequency = config.replication_replica_check_frequency,
instance_name = coordinator_query->instance_name_, sync_mode = coordinator_query->sync_mode_]() mutable {
handler.RegisterReplicaCoordinatorServer(std::string(replication_socket_address_tv.ValueString()),
std::string(coordinator_socket_address_tv.ValueString()),
replica_check_frequency, instance_name, sync_mode);
return std::vector<std::vector<TypedValue>>();
};
notifications->emplace_back(
SeverityLevel::INFO, NotificationCode::REGISTER_COORDINATOR_SERVER,