From 986ea37ead0ee89e33c7361fd1d8896ddc35cfbf Mon Sep 17 00:00:00 2001 From: Andi Skrgat <andi8647@gmail.com> Date: Fri, 19 Jan 2024 12:39:14 +0100 Subject: [PATCH] Refactoring of coord state --- src/coordination/CMakeLists.txt | 1 - src/coordination/coordinator_client.cpp | 33 ++- src/coordination/coordinator_state.cpp | 212 ++++++++++-------- .../coordination/coordinator_client.hpp | 7 +- .../coordination/coordinator_client_info.hpp | 40 +++- .../coordination/coordinator_config.hpp | 18 +- ...fo.hpp => coordinator_instance_status.hpp} | 12 +- .../coordination/coordinator_state.hpp | 16 +- src/dbms/coordinator_handler.cpp | 20 +- src/dbms/coordinator_handler.hpp | 14 +- src/query/interpreter.cpp | 68 +++--- src/query/interpreter.hpp | 21 +- .../client_initiated_failover.py | 31 +-- tests/stress/test_config.py | 194 ---------------- 14 files changed, 260 insertions(+), 427 deletions(-) rename src/coordination/include/coordination/{coordinator_entity_info.hpp => coordinator_instance_status.hpp} (80%) delete mode 100644 tests/stress/test_config.py diff --git a/src/coordination/CMakeLists.txt b/src/coordination/CMakeLists.txt index 29061a3f9..98d682830 100644 --- a/src/coordination/CMakeLists.txt +++ b/src/coordination/CMakeLists.txt @@ -7,7 +7,6 @@ target_sources(mg-coordination include/coordination/coordinator_rpc.hpp include/coordination/coordinator_server.hpp include/coordination/coordinator_config.hpp - include/coordination/coordinator_entity_info.hpp include/coordination/coordinator_exceptions.hpp include/coordination/coordinator_slk.hpp include/coordination/constants.hpp diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 486565342..18b01663d 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -27,11 +27,15 @@ auto CreateClientContext(const memgraph::coordination::CoordinatorClientConfig & } } // namespace -CoordinatorClient::CoordinatorClient(const CoordinatorClientConfig &config) +CoordinatorClient::CoordinatorClient(CoordinatorClientConfig config, + std::function<void(std::string_view)> freq_check_cb) : rpc_context_{CreateClientContext(config)}, rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port), &rpc_context_}, - config_{config} {} + config_{std::move(config)}, + freq_check_cb_{std::move(freq_check_cb)} { + StartFrequentCheck(); +} CoordinatorClient::~CoordinatorClient() { auto exit_job = utils::OnScopeExit([&] { @@ -46,22 +50,27 @@ CoordinatorClient::~CoordinatorClient() { void CoordinatorClient::StartFrequentCheck() { MG_ASSERT(config_.health_check_frequency_sec > std::chrono::seconds(0), "Health check frequency must be greater than 0"); - replica_checker_.Run("Coord checker", config_.health_check_frequency_sec, [rpc_client = &rpc_client_] { - try { - auto stream{rpc_client->Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()}; - stream.AwaitResponse(); - // last_response_time->store(std::chrono::system_clock::now(), std::memory_order_acq_rel); - } catch (const rpc::RpcFailedException &) { - // Nothing to do...wait for a reconnect - } - }); + + replica_checker_.Run( + "Coord checker", config_.health_check_frequency_sec, + [instance_name = config_.instance_name, rpc_client = &rpc_client_, freq_check_cb = freq_check_cb_] { + try { + auto stream{rpc_client->Stream<memgraph::replication_coordination_glue::FrequentHeartbeatRpc>()}; + stream.AwaitResponse(); + freq_check_cb(instance_name); + } catch (const rpc::RpcFailedException &) { + // Nothing to do...wait for a reconnect + } + }); } void CoordinatorClient::StopFrequentCheck() { replica_checker_.Stop(); } auto CoordinatorClient::InstanceName() const -> std::string_view { return config_.instance_name; } -auto CoordinatorClient::Endpoint() const -> io::network::Endpoint const & { return rpc_client_.Endpoint(); } +auto CoordinatorClient::Endpoint() const -> const io::network::Endpoint * { return &rpc_client_.Endpoint(); } +// TODO: remove this method and implement copy constructor auto CoordinatorClient::Config() const -> CoordinatorClientConfig const & { return config_; } +auto CoordinatorClient::Callback() const -> std::function<void(std::string_view)> const & { return freq_check_cb_; } ////// AF design choice auto CoordinatorClient::ReplicationClientInfo() const -> CoordinatorClientConfig::ReplicationClientInfo const & { diff --git a/src/coordination/coordinator_state.cpp b/src/coordination/coordinator_state.cpp index 0ab152aa1..84c075cc7 100644 --- a/src/coordination/coordinator_state.cpp +++ b/src/coordination/coordinator_state.cpp @@ -16,7 +16,6 @@ #include "coordination/coordinator_client.hpp" #include "coordination/coordinator_cluster_config.hpp" #include "coordination/coordinator_config.hpp" -#include "coordination/coordinator_entity_info.hpp" #include "flags/replication.hpp" #include "spdlog/spdlog.h" #include "utils/logging.hpp" @@ -53,8 +52,10 @@ CoordinatorState::CoordinatorState() { } } -/// TODO: Don't return client, start here the client -auto CoordinatorState::RegisterReplica(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus { +auto CoordinatorState::RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus { + MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), + "Coordinator cannot register replica since variant holds wrong alternative"); + const auto name_endpoint_status = std::visit(memgraph::utils::Overloaded{[](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { return RegisterMainReplicaCoordinatorStatus::NOT_COORDINATOR; @@ -72,32 +73,42 @@ auto CoordinatorState::RegisterReplica(const CoordinatorClientConfig &config) -> return name_endpoint_status; } - auto callback = [&]() -> void { + auto freq_check_cb = [&](std::string_view instance_name) -> void { MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_; auto replica_client_info = std::ranges::find_if( registered_replicas_info, - [&config](const CoordinatorClientInfo &replica) { return replica.instance_name_ == config.instance_name; }); + [instance_name](const CoordinatorClientInfo &replica) { return replica.instance_name_ == instance_name; }); - MG_ASSERT(replica_client_info == registered_replicas_info.end(), "Replica {} not found in registered replicas info", - config.instance_name); + // TODO: Refactor so that we just know about instances, nothing more. Here after failover it can happen that replica + // has become main MG_ASSERT(replica_client_info != registered_replicas_info.end(), "Replica {} not found in + // registered replicas info", + // instance_name); - auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>( - std::chrono::system_clock::now() - replica_client_info->last_response_time_.load(std::memory_order_acquire)); - - replica_client_info->is_alive_ = - sec_since_last_response.count() <= CoordinatorClusterConfig::alive_response_time_difference_sec_; + if (replica_client_info == registered_replicas_info.end()) { + auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_; + MG_ASSERT(registered_main_info->instance_name_ == instance_name, "Instance is neither a replica nor main..."); + registered_main_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release); + } else { + replica_client_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release); + } }; - // Maybe no need to return client if you can start replica client here - auto *replica_client = &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(config); - replica_client->StartFrequentCheck(); + auto *coord_client = + &std::get<CoordinatorData>(data_).registered_replicas_.emplace_back(std::move(config), std::move(freq_check_cb)); + + std::get<CoordinatorData>(data_).registered_replicas_info_.emplace_back(coord_client->InstanceName(), + coord_client->Endpoint()); + return RegisterMainReplicaCoordinatorStatus::SUCCESS; } -auto CoordinatorState::RegisterMain(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus { +auto CoordinatorState::RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus { + MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), + "Coordinator cannot register main since variant holds wrong alternative"); + const auto endpoint_status = std::visit( memgraph::utils::Overloaded{ [](const CoordinatorMainReplicaData & /*coordinator_main_replica_data*/) { @@ -110,87 +121,86 @@ auto CoordinatorState::RegisterMain(const CoordinatorClientConfig &config) -> Re return endpoint_status; } - auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_; - registered_main = std::make_unique<CoordinatorClient>(config); - - auto cb = [&]() -> void { + auto freq_check_cb = [&](std::string_view instance_name) -> void { MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Can't execute CoordinatorClient's callback since variant holds wrong alternative"); + MG_ASSERT(std::get<CoordinatorData>(data_).registered_main_info_.has_value(), + "Main info is not set, but callback is called"); + // TODO When we will support restoration of main, we have to assert that the instance is main or replica, not at + // this point.... auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_; - MG_ASSERT(registered_main_info.instance_name_ == config.instance_name, - "Registered main instance name {} does not match config instance name {}", - registered_main_info.instance_name_, config.instance_name); + MG_ASSERT(registered_main_info->instance_name_ == instance_name, + "Callback called for wrong instance name: {}, expected: {}", instance_name, + registered_main_info->instance_name_); - auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>( - std::chrono::system_clock::now() - registered_main_info.last_response_time_.load(std::memory_order_acquire)); - registered_main_info.is_alive_ = - sec_since_last_response.count() <= CoordinatorClusterConfig::alive_response_time_difference_sec_; + registered_main_info->last_response_time_.store(std::chrono::system_clock::now(), std::memory_order_release); - if (!registered_main_info.is_alive_) { - spdlog::warn("Main is not alive, starting failover"); - switch (auto failover_status = DoFailover(); failover_status) { - using enum DoFailoverStatus; - case ALL_REPLICAS_DOWN: - spdlog::warn("Failover aborted since all replicas are down!"); - case MAIN_ALIVE: - spdlog::warn("Failover aborted since main is alive!"); - case CLUSTER_UNINITIALIZED: - spdlog::warn("Failover aborted since cluster is uninitialized!"); - case SUCCESS: - break; - } - } + // if (!registered_main_info->is_alive_) { + // spdlog::warn("Main is not alive, starting failover"); + // switch (auto failover_status = DoFailover(); failover_status) { + // using enum DoFailoverStatus; + // case ALL_REPLICAS_DOWN: + // spdlog::warn("Failover aborted since all replicas are down!"); + // case MAIN_ALIVE: + // spdlog::warn("Failover aborted since main is alive!"); + // case CLUSTER_UNINITIALIZED: + // spdlog::warn("Failover aborted since cluster is uninitialized!"); + // case SUCCESS: + // break; + // } + // } }; - registered_main->StartFrequentCheck(); + auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_; + registered_main = std::make_unique<CoordinatorClient>(std::move(config), std::move(freq_check_cb)); + + auto ®istered_main_info = std::get<CoordinatorData>(data_).registered_main_info_; + registered_main_info.emplace(registered_main->InstanceName(), registered_main->Endpoint()); + return RegisterMainReplicaCoordinatorStatus::SUCCESS; } -auto CoordinatorState::ShowReplicas() const -> std::vector<CoordinatorEntityInfo> { +auto CoordinatorState::ShowReplicas() const -> std::vector<CoordinatorInstanceStatus> { MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Can't call show replicas on data_, as variant holds wrong alternative"); - std::vector<CoordinatorEntityInfo> result; - const auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_; - result.reserve(registered_replicas.size()); - std::ranges::transform(registered_replicas, std::back_inserter(result), [](const auto &replica) { - return CoordinatorEntityInfo{replica.InstanceName(), replica.Endpoint()}; - }); - return result; + const auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_; + std::vector<CoordinatorInstanceStatus> instances_status; + instances_status.reserve(registered_replicas_info.size()); + + std::ranges::transform( + registered_replicas_info, std::back_inserter(instances_status), + [](const CoordinatorClientInfo &coord_client_info) { + const auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>( + std::chrono::system_clock::now() - + coord_client_info.last_response_time_.load(std::memory_order_acquire)) + .count(); + + return CoordinatorInstanceStatus{ + .instance_name = coord_client_info.instance_name_, + .socket_address = coord_client_info.endpoint->SocketAddress(), + .is_alive = sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_}; + }); + return instances_status; } -auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorEntityInfo> { +auto CoordinatorState::ShowMain() const -> std::optional<CoordinatorInstanceStatus> { MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Can't call show main on data_, as variant holds wrong alternative"); - const auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_; - if (registered_main) { - return CoordinatorEntityInfo{registered_main->InstanceName(), registered_main->Endpoint()}; + const auto &main = std::get<CoordinatorData>(data_).registered_main_info_; + if (!main.has_value()) { + return std::nullopt; } - return std::nullopt; -} + const auto sec_since_last_response = + std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - + main->last_response_time_.load(std::memory_order_acquire)) + .count(); -auto CoordinatorState::PingReplicas() const -> std::unordered_map<std::string_view, bool> { - MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), - "Can't call ping replicas on data_, as variant holds wrong alternative"); - std::unordered_map<std::string_view, bool> result; - const auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_; - result.reserve(registered_replicas.size()); - // for (const CoordinatorClient &replica_client : registered_replicas) { - // result.emplace(replica_client.InstanceName(), replica_client.DoHealthCheck()); - // } - - return result; -} - -auto CoordinatorState::PingMain() const -> std::optional<CoordinatorEntityHealthInfo> { - MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), - "Can't call show main on data_, as variant holds wrong alternative"); - const auto ®istered_main = std::get<CoordinatorData>(data_).registered_main_; - if (registered_main) { - // return CoordinatorEntityHealthInfo{registered_main->InstanceName(), registered_main->DoHealthCheck()}; - } - return std::nullopt; -} + return CoordinatorInstanceStatus{ + .instance_name = main->instance_name_, + .socket_address = main->endpoint->SocketAddress(), + .is_alive = sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_}; +}; [[nodiscard]] auto CoordinatorState::DoFailover() -> DoFailoverStatus { // 1. MAIN is already down, stop sending frequent checks @@ -201,32 +211,53 @@ auto CoordinatorState::PingMain() const -> std::optional<CoordinatorEntityHealth // 6. remove replica which was promoted to main from all replicas -> this will shut down RPC frequent check client // (coordinator) // 7. for new main start frequent checks (coordinator) - /* + MG_ASSERT(std::holds_alternative<CoordinatorData>(data_), "Cannot do failover since variant holds wrong alternative"); using ReplicationClientInfo = CoordinatorClientConfig::ReplicationClientInfo; // 1. - auto ¤t_main = std::get<CoordinatorData>(data_).registered_main_; + auto ¤t_main_info = std::get<CoordinatorData>(data_).registered_main_info_; - if (!current_main) { + if (!current_main_info.has_value()) { return DoFailoverStatus::CLUSTER_UNINITIALIZED; } - if (current_main->DoHealthCheck()) { + auto sec_since_last_response_main = + std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - + current_main_info->last_response_time_.load()) + .count(); + + if (sec_since_last_response_main <= CoordinatorClusterConfig::alive_response_time_difference_sec_) { return DoFailoverStatus::MAIN_ALIVE; } + + auto ¤t_main = std::get<CoordinatorData>(data_).registered_main_; + // TODO: stop pinging as soon as you figure out that failover is needed current_main->StopFrequentCheck(); // 2. // Get all replicas and find new main - auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_; + auto ®istered_replicas_info = std::get<CoordinatorData>(data_).registered_replicas_info_; - const auto chosen_replica = std::ranges::find_if( - registered_replicas, [](const CoordinatorClient &replica) { return replica.DoHealthCheck(); }); - if (chosen_replica == registered_replicas.end()) { + const auto chosen_replica_info = + std::ranges::find_if(registered_replicas_info, [](const CoordinatorClientInfo &client_info) { + auto sec_since_last_response = std::chrono::duration_cast<std::chrono::seconds>( + std::chrono::system_clock::now() - client_info.last_response_time_.load()) + .count(); + return sec_since_last_response <= CoordinatorClusterConfig::alive_response_time_difference_sec_; + }); + if (chosen_replica_info == registered_replicas_info.end()) { return DoFailoverStatus::ALL_REPLICAS_DOWN; } + auto ®istered_replicas = std::get<CoordinatorData>(data_).registered_replicas_; + const auto chosen_replica = + std::ranges::find_if(registered_replicas, [&chosen_replica_info](const CoordinatorClient &replica) { + return replica.InstanceName() == chosen_replica_info->instance_name_; + }); + MG_ASSERT(chosen_replica != registered_replicas.end(), "Chosen replica {} not found in registered replicas", + chosen_replica_info->instance_name_); + std::vector<ReplicationClientInfo> repl_clients_info; repl_clients_info.reserve(registered_replicas.size() - 1); std::ranges::for_each(registered_replicas, [&chosen_replica, &repl_clients_info](const CoordinatorClient &replica) { @@ -239,9 +270,9 @@ auto CoordinatorState::PingMain() const -> std::optional<CoordinatorEntityHealth // Set on coordinator data of new main // allocate resources for new main, clear replication info on this replica as main // set last response time - auto potential_new_main = std::make_unique<CoordinatorClient>(chosen_replica->Config()); + auto potential_new_main = std::make_unique<CoordinatorClient>(chosen_replica->Config(), chosen_replica->Callback()); potential_new_main->ReplicationClientInfo().reset(); - potential_new_main->UpdateTimeCheck(chosen_replica->GetLastTimeResponse()); + auto potential_new_main_info = *chosen_replica_info; // 4. if (!chosen_replica->SendPromoteReplicaToMainRpc(std::move(repl_clients_info))) { @@ -252,15 +283,14 @@ auto CoordinatorState::PingMain() const -> std::optional<CoordinatorEntityHealth // 5. current_main = std::move(potential_new_main); + current_main_info.emplace(potential_new_main_info); // 6. remove old replica // TODO: Stop pinging chosen_replica before failover. // Check that it doesn't fail when you call StopFrequentCheck if it is already stopped registered_replicas.erase(chosen_replica); + registered_replicas_info.erase(chosen_replica_info); - // 7. - current_main->StartFrequentCheck(); - */ return DoFailoverStatus::SUCCESS; } diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index f33c3cf9c..535684286 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -27,7 +27,7 @@ class CoordinatorClient { using ReplClientInfo = CoordinatorClientConfig::ReplicationClientInfo; using ReplicationClientsInfo = std::vector<ReplClientInfo>; - explicit CoordinatorClient(const CoordinatorClientConfig &config); + explicit CoordinatorClient(CoordinatorClientConfig config, std::function<void(std::string_view)> freq_check_cb); ~CoordinatorClient(); @@ -43,10 +43,12 @@ class CoordinatorClient { auto SendPromoteReplicaToMainRpc(ReplicationClientsInfo replication_clients_info) const -> bool; auto InstanceName() const -> std::string_view; - auto Endpoint() const -> io::network::Endpoint const &; + auto Endpoint() const -> const io::network::Endpoint *; auto Config() const -> CoordinatorClientConfig const &; auto ReplicationClientInfo() const -> ReplClientInfo const &; auto ReplicationClientInfo() -> std::optional<ReplClientInfo> &; + // TODO: We should add copy constructor and then there won't be need for this + auto Callback() const -> std::function<void(std::string_view)> const &; friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) { return first.config_ == second.config_; @@ -60,6 +62,7 @@ class CoordinatorClient { mutable rpc::Client rpc_client_; CoordinatorClientConfig config_; + std::function<void(std::string_view)> freq_check_cb_; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_client_info.hpp b/src/coordination/include/coordination/coordinator_client_info.hpp index 9b31577a6..b8ac32a43 100644 --- a/src/coordination/include/coordination/coordinator_client_info.hpp +++ b/src/coordination/include/coordination/coordinator_client_info.hpp @@ -13,16 +13,52 @@ #ifdef MG_ENTERPRISE +#include "io/network/endpoint.hpp" + #include <atomic> #include <chrono> namespace memgraph::coordination { -// TODO: better connect this struct CoordinatorClientInfo { + CoordinatorClientInfo(std::string_view instance_name, const io::network::Endpoint *endpoint) + : last_response_time_(std::chrono::system_clock::now()), instance_name_(instance_name), endpoint(endpoint) {} + + ~CoordinatorClientInfo() = default; + + CoordinatorClientInfo(const CoordinatorClientInfo &other) + : last_response_time_(other.last_response_time_.load()), + instance_name_(other.instance_name_), + endpoint(other.endpoint) {} + + CoordinatorClientInfo &operator=(const CoordinatorClientInfo &other) { + if (this != &other) { + last_response_time_.store(other.last_response_time_.load()); + instance_name_ = other.instance_name_; + endpoint = other.endpoint; + } + return *this; + } + + CoordinatorClientInfo(CoordinatorClientInfo &&other) noexcept + : last_response_time_(other.last_response_time_.load()), + instance_name_(other.instance_name_), + endpoint(other.endpoint) {} + + CoordinatorClientInfo &operator=(CoordinatorClientInfo &&other) noexcept { + if (this != &other) { + last_response_time_.store(other.last_response_time_.load()); + instance_name_ = other.instance_name_; + endpoint = other.endpoint; + } + return *this; + } + + /// TODO: Add a method is_alive + std::atomic<std::chrono::system_clock::time_point> last_response_time_{}; - bool is_alive_{false}; std::string_view instance_name_; + const io::network::Endpoint *endpoint; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_config.hpp b/src/coordination/include/coordination/coordinator_config.hpp index e3a723177..f9fb900c4 100644 --- a/src/coordination/include/coordination/coordinator_config.hpp +++ b/src/coordination/include/coordination/coordinator_config.hpp @@ -25,16 +25,14 @@ namespace memgraph::coordination { inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; struct CoordinatorClientConfig { - const std::string instance_name; - const std::string ip_address; - const uint16_t port{}; - - // Frequency with which coordinator pings main/replicas about it status - const std::chrono::seconds health_check_frequency_sec{1}; + std::string instance_name; + std::string ip_address; + uint16_t port{}; + std::chrono::seconds health_check_frequency_sec{1}; // Info which coordinator will send to new main when performing failover struct ReplicationClientInfo { - // Should be the same as CoordinatorClientConfig's instance_name + // Must be the same as CoordinatorClientConfig's instance_name std::string instance_name; replication_coordination_glue::ReplicationMode replication_mode{}; std::string replication_ip_address; @@ -46,13 +44,13 @@ struct CoordinatorClientConfig { std::optional<ReplicationClientInfo> replication_client_info; struct SSL { - const std::string key_file; - const std::string cert_file; + std::string key_file; + std::string cert_file; friend bool operator==(const SSL &, const SSL &) = default; }; - const std::optional<SSL> ssl; + std::optional<SSL> ssl; friend bool operator==(CoordinatorClientConfig const &, CoordinatorClientConfig const &) = default; }; diff --git a/src/coordination/include/coordination/coordinator_entity_info.hpp b/src/coordination/include/coordination/coordinator_instance_status.hpp similarity index 80% rename from src/coordination/include/coordination/coordinator_entity_info.hpp rename to src/coordination/include/coordination/coordinator_instance_status.hpp index eb4321761..b220636f8 100644 --- a/src/coordination/include/coordination/coordinator_entity_info.hpp +++ b/src/coordination/include/coordination/coordinator_instance_status.hpp @@ -19,14 +19,10 @@ namespace memgraph::coordination { -struct CoordinatorEntityInfo { - std::string_view name; - const io::network::Endpoint &endpoint; -}; - -struct CoordinatorEntityHealthInfo { - std::string_view name; - bool alive; +struct CoordinatorInstanceStatus { + std::string_view instance_name; + std::string socket_address; + bool is_alive; }; } // namespace memgraph::coordination diff --git a/src/coordination/include/coordination/coordinator_state.hpp b/src/coordination/include/coordination/coordinator_state.hpp index b7d814a60..64d3d75a5 100644 --- a/src/coordination/include/coordination/coordinator_state.hpp +++ b/src/coordination/include/coordination/coordinator_state.hpp @@ -15,7 +15,7 @@ #include "coordination/coordinator_client.hpp" #include "coordination/coordinator_client_info.hpp" -#include "coordination/coordinator_entity_info.hpp" +#include "coordination/coordinator_instance_status.hpp" #include "coordination/coordinator_server.hpp" #include "coordination/failover_status.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" @@ -40,17 +40,13 @@ class CoordinatorState { CoordinatorState(CoordinatorState &&) noexcept = delete; CoordinatorState &operator=(CoordinatorState &&) noexcept = delete; - [[nodiscard]] auto RegisterReplica(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus; + [[nodiscard]] auto RegisterReplica(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus; - [[nodiscard]] auto RegisterMain(const CoordinatorClientConfig &config) -> RegisterMainReplicaCoordinatorStatus; + [[nodiscard]] auto RegisterMain(CoordinatorClientConfig config) -> RegisterMainReplicaCoordinatorStatus; - auto ShowReplicas() const -> std::vector<CoordinatorEntityInfo>; + auto ShowReplicas() const -> std::vector<CoordinatorInstanceStatus>; - auto PingReplicas() const -> std::unordered_map<std::string_view, bool>; - - auto ShowMain() const -> std::optional<CoordinatorEntityInfo>; - - auto PingMain() const -> std::optional<CoordinatorEntityHealthInfo>; + auto ShowMain() const -> std::optional<CoordinatorInstanceStatus>; // The client code must check that the server exists before calling this method. auto GetCoordinatorServer() const -> CoordinatorServer &; @@ -63,7 +59,7 @@ class CoordinatorState { std::list<CoordinatorClient> registered_replicas_; std::vector<CoordinatorClientInfo> registered_replicas_info_; std::unique_ptr<CoordinatorClient> registered_main_; - CoordinatorClientInfo registered_main_info_; + std::optional<CoordinatorClientInfo> registered_main_info_; }; struct CoordinatorMainReplicaData { diff --git a/src/dbms/coordinator_handler.cpp b/src/dbms/coordinator_handler.cpp index 139824607..01abb31a1 100644 --- a/src/dbms/coordinator_handler.cpp +++ b/src/dbms/coordinator_handler.cpp @@ -19,32 +19,24 @@ namespace memgraph::dbms { CoordinatorHandler::CoordinatorHandler(DbmsHandler &dbms_handler) : dbms_handler_(dbms_handler) {} -auto CoordinatorHandler::RegisterReplicaOnCoordinator(const coordination::CoordinatorClientConfig &config) +auto CoordinatorHandler::RegisterReplicaOnCoordinator(coordination::CoordinatorClientConfig config) -> coordination::RegisterMainReplicaCoordinatorStatus { - return dbms_handler_.CoordinatorState().RegisterReplica(config); + return dbms_handler_.CoordinatorState().RegisterReplica(std::move(config)); } -auto CoordinatorHandler::RegisterMainOnCoordinator(const memgraph::coordination::CoordinatorClientConfig &config) +auto CoordinatorHandler::RegisterMainOnCoordinator(memgraph::coordination::CoordinatorClientConfig config) -> coordination::RegisterMainReplicaCoordinatorStatus { - return dbms_handler_.CoordinatorState().RegisterMain(config); + return dbms_handler_.CoordinatorState().RegisterMain(std::move(config)); } -auto CoordinatorHandler::ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorEntityInfo> { +auto CoordinatorHandler::ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorInstanceStatus> { return dbms_handler_.CoordinatorState().ShowReplicas(); } -auto CoordinatorHandler::PingReplicasOnCoordinator() const -> std::unordered_map<std::string_view, bool> { - return dbms_handler_.CoordinatorState().PingReplicas(); -} - -auto CoordinatorHandler::ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityInfo> { +auto CoordinatorHandler::ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorInstanceStatus> { return dbms_handler_.CoordinatorState().ShowMain(); } -auto CoordinatorHandler::PingMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityHealthInfo> { - return dbms_handler_.CoordinatorState().PingMain(); -} - auto CoordinatorHandler::DoFailover() const -> coordination::DoFailoverStatus { return dbms_handler_.CoordinatorState().DoFailover(); } diff --git a/src/dbms/coordinator_handler.hpp b/src/dbms/coordinator_handler.hpp index b44ec7972..9632c76b5 100644 --- a/src/dbms/coordinator_handler.hpp +++ b/src/dbms/coordinator_handler.hpp @@ -16,7 +16,7 @@ #include "utils/result.hpp" #include "coordination/coordinator_config.hpp" -#include "coordination/coordinator_entity_info.hpp" +#include "coordination/coordinator_instance_status.hpp" #include "coordination/failover_status.hpp" #include "coordination/register_main_replica_coordinator_status.hpp" @@ -32,19 +32,15 @@ class CoordinatorHandler { public: explicit CoordinatorHandler(DbmsHandler &dbms_handler); - auto RegisterReplicaOnCoordinator(const coordination::CoordinatorClientConfig &config) + auto RegisterReplicaOnCoordinator(coordination::CoordinatorClientConfig config) -> coordination::RegisterMainReplicaCoordinatorStatus; - auto RegisterMainOnCoordinator(const coordination::CoordinatorClientConfig &config) + auto RegisterMainOnCoordinator(coordination::CoordinatorClientConfig config) -> coordination::RegisterMainReplicaCoordinatorStatus; - auto ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorEntityInfo>; + auto ShowReplicasOnCoordinator() const -> std::vector<coordination::CoordinatorInstanceStatus>; - auto ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityInfo>; - - auto PingReplicasOnCoordinator() const -> std::unordered_map<std::string_view, bool>; - - auto PingMainOnCoordinator() const -> std::optional<coordination::CoordinatorEntityHealthInfo>; + auto ShowMainOnCoordinator() const -> std::optional<coordination::CoordinatorInstanceStatus>; auto DoFailover() const -> coordination::DoFailoverStatus; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 46035f859..4cc16cedc 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -110,7 +110,6 @@ #ifdef MG_ENTERPRISE #include "coordination/constants.hpp" -#include "coordination/coordinator_entity_info.hpp" #endif namespace memgraph::metrics { @@ -487,7 +486,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { .replication_ip_address = replication_ip, .replication_port = replication_port}; - const auto coordinator_client_config = + auto coordinator_client_config = coordination::CoordinatorClientConfig{.instance_name = instance_name, .ip_address = coordinator_server_ip, .port = coordinator_server_port, @@ -495,7 +494,7 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { .replication_client_info = repl_config, .ssl = std::nullopt}; - auto status = coordinator_handler_.RegisterReplicaOnCoordinator(coordinator_client_config); + auto status = coordinator_handler_.RegisterReplicaOnCoordinator(std::move(coordinator_client_config)); switch (status) { using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus; case NAME_EXISTS: @@ -521,13 +520,14 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { throw QueryRuntimeException("Invalid socket address!"); } const auto [ip, port] = *maybe_ip_and_port; - const auto config = coordination::CoordinatorClientConfig{.instance_name = instance_name, - .ip_address = ip, - .port = port, - .health_check_frequency_sec = instance_check_frequency, - .ssl = std::nullopt}; + auto coordinator_client_config = + coordination::CoordinatorClientConfig{.instance_name = instance_name, + .ip_address = ip, + .port = port, + .health_check_frequency_sec = instance_check_frequency, + .ssl = std::nullopt}; - auto status = coordinator_handler_.RegisterMainOnCoordinator(config); + auto status = coordinator_handler_.RegisterMainOnCoordinator(std::move(coordinator_client_config)); switch (status) { using enum memgraph::coordination::RegisterMainReplicaCoordinatorStatus; case NAME_EXISTS: @@ -564,20 +564,17 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { } } + // TODO: Remove this method, probably not needed. std::vector<MainReplicaStatus> ShowMainReplicaStatus( - const std::vector<coordination::CoordinatorEntityInfo> &replicas, - const std::unordered_map<std::string_view, bool> &health_check_replicas, - const std::optional<coordination::CoordinatorEntityInfo> &main, - const std::optional<coordination::CoordinatorEntityHealthInfo> &health_check_main) const override { + const std::vector<coordination::CoordinatorInstanceStatus> &replicas, + const std::optional<coordination::CoordinatorInstanceStatus> &main) const override { std::vector<MainReplicaStatus> result{}; result.reserve(replicas.size() + 1); // replicas + 1 main - std::ranges::transform( - replicas, std::back_inserter(result), [&health_check_replicas](const auto &replica) -> MainReplicaStatus { - return {replica.name, replica.endpoint.SocketAddress(), health_check_replicas.at(replica.name), false}; - }); + std::ranges::transform(replicas, std::back_inserter(result), [](const auto &replica) -> MainReplicaStatus { + return {replica.instance_name, replica.socket_address, replica.is_alive, false}; + }); if (main) { - bool is_main_alive = health_check_main.has_value() ? health_check_main.value().alive : false; - result.emplace_back(main->name, main->endpoint.SocketAddress(), is_main_alive, true); + result.emplace_back(main->instance_name, main->socket_address, main->is_alive, true); } return result; } @@ -585,21 +582,13 @@ class CoordQueryHandler final : public query::CoordinatorQueryHandler { #endif #ifdef MG_ENTERPRISE - std::vector<coordination::CoordinatorEntityInfo> ShowReplicasOnCoordinator() const override { + std::vector<coordination::CoordinatorInstanceStatus> ShowReplicasOnCoordinator() const override { return coordinator_handler_.ShowReplicasOnCoordinator(); } - std::unordered_map<std::string_view, bool> PingReplicasOnCoordinator() const override { - return coordinator_handler_.PingReplicasOnCoordinator(); - } - - std::optional<coordination::CoordinatorEntityInfo> ShowMainOnCoordinator() const override { + std::optional<coordination::CoordinatorInstanceStatus> ShowMainOnCoordinator() const override { return coordinator_handler_.ShowMainOnCoordinator(); } - - std::optional<coordination::CoordinatorEntityHealthInfo> PingMainOnCoordinator() const override { - return coordinator_handler_.PingMainOnCoordinator(); - } #endif private: @@ -1094,11 +1083,18 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param "High availability is experimental feature. Please set MG_EXPERIMENTAL_HIGH_AVAILABILITY compile flag to " "be able to use this functionality."); } + if (!FLAGS_coordinator) { throw QueryRuntimeException("Only coordinator can register coordinator server!"); } + + // TODO: MemoryResource for EvaluationContext, it should probably be passed as + // the argument to Callback. + EvaluationContext evaluation_context{.timestamp = QueryTimestamp(), .parameters = parameters}; + auto evaluator = PrimitiveLiteralExpressionEvaluator{evaluation_context}; auto coordinator_socket_address_tv = coordinator_query->coordinator_socket_address_->Accept(evaluator); auto replication_socket_address_tv = coordinator_query->socket_address_->Accept(evaluator); + callback.fn = [handler = CoordQueryHandler{dbms_handler}, coordinator_socket_address_tv, replication_socket_address_tv, replica_check_frequency = config.replication_replica_check_frequency, @@ -1132,10 +1128,8 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param callback.header = {"name", "socket_address", "alive", "role"}; callback.fn = [handler = CoordQueryHandler{dbms_handler}, replica_nfields = callback.header.size()]() mutable { - const auto main = handler.ShowMainOnCoordinator(); - const auto health_check_main = main ? handler.PingMainOnCoordinator() : std::nullopt; - const auto result_status = handler.ShowMainReplicaStatus( - handler.ShowReplicasOnCoordinator(), handler.PingReplicasOnCoordinator(), main, health_check_main); + const auto result_status = + handler.ShowMainReplicaStatus(handler.ShowReplicasOnCoordinator(), handler.ShowMainOnCoordinator()); std::vector<std::vector<TypedValue>> result{}; result.reserve(result_status.size()); @@ -1166,10 +1160,9 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param callback.header = {"name", "socket_address", "alive", "role"}; callback.fn = [handler = CoordQueryHandler{dbms_handler}]() mutable { handler.DoFailover(); - const auto main = handler.ShowMainOnCoordinator(); - const auto health_check_main = main ? handler.PingMainOnCoordinator() : std::nullopt; - const auto result_status = handler.ShowMainReplicaStatus( - handler.ShowReplicasOnCoordinator(), handler.PingReplicasOnCoordinator(), main, health_check_main); + + const auto result_status = + handler.ShowMainReplicaStatus(handler.ShowReplicasOnCoordinator(), handler.ShowMainOnCoordinator()); std::vector<std::vector<TypedValue>> result{}; result.reserve(result_status.size()); @@ -1180,6 +1173,7 @@ Callback HandleCoordinatorQuery(CoordinatorQuery *coordinator_query, const Param }); return result; }; + notifications->emplace_back(SeverityLevel::INFO, NotificationCode::DO_FAILOVER, "DO FAILOVER called on coordinator."); return callback; diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 4cd7b3992..1c9265006 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -15,7 +15,6 @@ #include <gflags/gflags.h> -#include "coordination/coordinator_entity_info.hpp" #include "dbms/database.hpp" #include "dbms/dbms_handler.hpp" #include "memory/query_memory_control.hpp" @@ -53,6 +52,10 @@ #include "utils/timer.hpp" #include "utils/tsc.hpp" +#ifdef MG_ENTERPRISE +#include "coordination/coordinator_instance_status.hpp" +#endif + namespace memgraph::metrics { extern const Event FailedQuery; extern const Event FailedPrepare; @@ -114,26 +117,18 @@ class CoordinatorQueryHandler { const std::string &instance_name) = 0; /// @throw QueryRuntimeException if an error ocurred. - virtual std::vector<coordination::CoordinatorEntityInfo> ShowReplicasOnCoordinator() const = 0; + virtual std::vector<coordination::CoordinatorInstanceStatus> ShowReplicasOnCoordinator() const = 0; /// @throw QueryRuntimeException if an error ocurred. - virtual std::optional<coordination::CoordinatorEntityInfo> ShowMainOnCoordinator() const = 0; - - /// @throw QueryRuntimeException if an error ocurred. - virtual std::unordered_map<std::string_view, bool> PingReplicasOnCoordinator() const = 0; - - /// @throw QueryRuntimeException if an error ocurred. - virtual std::optional<coordination::CoordinatorEntityHealthInfo> PingMainOnCoordinator() const = 0; + virtual std::optional<coordination::CoordinatorInstanceStatus> ShowMainOnCoordinator() const = 0; /// @throw QueryRuntimeException if an error ocurred. virtual void DoFailover() const = 0; /// @throw QueryRuntimeException if an error ocurred. virtual std::vector<MainReplicaStatus> ShowMainReplicaStatus( - const std::vector<coordination::CoordinatorEntityInfo> &replicas, - const std::unordered_map<std::string_view, bool> &health_check_replicas, - const std::optional<coordination::CoordinatorEntityInfo> &main, - const std::optional<coordination::CoordinatorEntityHealthInfo> &health_check_main) const = 0; + const std::vector<coordination::CoordinatorInstanceStatus> &replicas, + const std::optional<coordination::CoordinatorInstanceStatus> &main) const = 0; #endif }; diff --git a/tests/e2e/high_availability_experimental/client_initiated_failover.py b/tests/e2e/high_availability_experimental/client_initiated_failover.py index f54e67cd6..2368cf8bd 100644 --- a/tests/e2e/high_availability_experimental/client_initiated_failover.py +++ b/tests/e2e/high_availability_experimental/client_initiated_failover.py @@ -155,29 +155,13 @@ def test_simple_client_initiated_failover(connection): def test_failover_fails_all_replicas_down(connection): - # 1. Start all instances - # 2. Kill all replicas - # 3. Kill main - # 4. Run DO FAILOVER on COORDINATOR. Assert exception is being thrown due to all replicas being down - # 5. Assert cluster status didn't change - - # 1. interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - # 2. interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_1") interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_2") - - # 3. interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "instance_3") coord_cursor = connection(7690, "coordinator").cursor() - # 4. - with pytest.raises(Exception) as e: - execute_and_fetch_all(coord_cursor, "DO FAILOVER;") - assert str(e.value) == "Failover aborted since all replicas are down!" - - # 5. def retrieve_data(): return set(execute_and_fetch_all(coord_cursor, "SHOW REPLICATION CLUSTER;")) @@ -189,16 +173,17 @@ def test_failover_fails_all_replicas_down(connection): } mg_sleep_and_assert(expected_data_on_coord, retrieve_data) + # 4. + with pytest.raises(Exception) as e: + execute_and_fetch_all(coord_cursor, "DO FAILOVER;") + assert str(e.value) == "Failover aborted since all replicas are down!" + + mg_sleep_and_assert(expected_data_on_coord, retrieve_data) + def test_failover_fails_main_is_alive(connection): - # 1. Start all instances - # 2. Run DO FAILOVER on COORDINATOR. Assert exception is being thrown due to main is still live. - # 3. Assert cluster status didn't change - - # 1. interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) - # 2. coord_cursor = connection(7690, "coordinator").cursor() def retrieve_data(): @@ -211,12 +196,10 @@ def test_failover_fails_main_is_alive(connection): } mg_sleep_and_assert(expected_data_on_coord, retrieve_data) - # 4. with pytest.raises(Exception) as e: execute_and_fetch_all(coord_cursor, "DO FAILOVER;") assert str(e.value) == "Failover aborted since main is alive!" - # 5. mg_sleep_and_assert(expected_data_on_coord, retrieve_data) diff --git a/tests/stress/test_config.py b/tests/stress/test_config.py deleted file mode 100644 index 2228be249..000000000 --- a/tests/stress/test_config.py +++ /dev/null @@ -1,194 +0,0 @@ -import itertools -import os -from dataclasses import dataclass -from typing import List - -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -STATS_FILE = os.path.join(SCRIPT_DIR, ".long_running_stats") - - -class DatasetConstants: - TEST = "test" - OPTIONS = "options" - TIMEOUT = "timeout" - MODE = "mode" - MEMGRAPH_OPTIONS = "memgraph_options" - - -@dataclass -class DatabaseMode: - storage_mode: str - isolation_level: str - - -class StorageModeConstants: - IN_MEMORY_TRANSACTIONAL = "IN_MEMORY_TRANSACTIONAL" - IN_MEMORY_ANALYTICAL = "IN_MEMORY_ANALYTICAL" - - @classmethod - def to_list(cls) -> List[str]: - return [cls.IN_MEMORY_TRANSACTIONAL, cls.IN_MEMORY_ANALYTICAL] - - -class IsolationLevelConstants: - SNAPSHOT_ISOLATION = "SNAPSHOT ISOLATION" - READ_COMMITED = "READ COMMITED" - READ_UNCOMMITED = "READ UNCOMMITED" - - @classmethod - def to_list(cls) -> List[str]: - return [cls.SNAPSHOT_SERIALIZATION, cls.READ_COMMITED, cls.READ_UNCOMMITED] - - -def get_default_database_mode() -> DatabaseMode: - return DatabaseMode(StorageModeConstants.IN_MEMORY_TRANSACTIONAL, IsolationLevelConstants.SNAPSHOT_ISOLATION) - - -def get_all_database_modes() -> List[DatabaseMode]: - return [ - DatabaseMode(x[0], x[1]) - for x in itertools.product(StorageModeConstants.to_list(), IsolationLevelConstants.to_list()) - ] - - -# dataset calibrated for running on Apollo (total 4min) -# bipartite.py runs for approx. 30s -# create_match.py runs for approx. 30s -# long_running runs for 1min -# long_running runs for 2min -SMALL_DATASET = [ - { - DatasetConstants.TEST: "bipartite.py", - DatasetConstants.OPTIONS: ["--u-count", "100", "--v-count", "100"], - DatasetConstants.TIMEOUT: 5, - DatasetConstants.MODE: [get_default_database_mode()], - }, - { - DatasetConstants.TEST: "detach_delete.py", - DatasetConstants.OPTIONS: ["--worker-count", "4", "--repetition-count", "100"], - DatasetConstants.TIMEOUT: 5, - DatasetConstants.MODE: [get_default_database_mode()], - }, - { - DatasetConstants.TEST: "memory_tracker.py", - DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"], - DatasetConstants.TIMEOUT: 5, - DatasetConstants.MODE: [get_default_database_mode()], - DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"], - }, - { - DatasetConstants.TEST: "memory_limit.py", - DatasetConstants.OPTIONS: ["--worker-count", "5", "--repetition-count", "100"], - DatasetConstants.TIMEOUT: 5, - DatasetConstants.MODE: [get_default_database_mode()], - DatasetConstants.MEMGRAPH_OPTIONS: ["--memory-limit=2048"], - }, - { - DatasetConstants.TEST: "create_match.py", - DatasetConstants.OPTIONS: ["--vertex-count", "40000", "--create-pack-size", "100"], - DatasetConstants.TIMEOUT: 5, - DatasetConstants.MODE: [get_default_database_mode()], - }, - { - DatasetConstants.TEST: "parser.cpp", - DatasetConstants.OPTIONS: ["--per-worker-query-count", "1000"], - DatasetConstants.TIMEOUT: 5, - DatasetConstants.MODE: [get_default_database_mode()], - }, - { - DatasetConstants.TEST: "long_running.cpp", - DatasetConstants.OPTIONS: [ - "--vertex-count", - "1000", - "--edge-count", - "5000", - "--max-time", - "1", - "--verify", - "20", - ], - DatasetConstants.TIMEOUT: 5, - DatasetConstants.MODE: [get_default_database_mode()], - }, - { - DatasetConstants.TEST: "long_running.cpp", - DatasetConstants.OPTIONS: [ - "--vertex-count", - "10000", - "--edge-count", - "50000", - "--max-time", - "2", - "--verify", - "30", - "--stats-file", - STATS_FILE, - ], - DatasetConstants.TIMEOUT: 5, - DatasetConstants.MODE: [get_default_database_mode()], - }, -] - -# dataset calibrated for running on daily stress instance (total 9h) -# bipartite.py and create_match.py run for approx. 15min -# long_running runs for 5min x 6 times = 30min -# long_running runs for 8h -LARGE_DATASET = ( - [ - { - DatasetConstants.TEST: "bipartite.py", - DatasetConstants.OPTIONS: ["--u-count", "300", "--v-count", "300"], - DatasetConstants.TIMEOUT: 30, - DatasetConstants.MODE: [get_default_database_mode()], - }, - { - DatasetConstants.TEST: "detach_delete.py", - DatasetConstants.OPTIONS: ["--worker-count", "4", "--repetition-count", "300"], - DatasetConstants.TIMEOUT: 5, - DatasetConstants.MODE: [get_default_database_mode()], - }, - { - DatasetConstants.TEST: "create_match.py", - DatasetConstants.OPTIONS: ["--vertex-count", "500000", "--create-pack-size", "500"], - DatasetConstants.TIMEOUT: 30, - DatasetConstants.MODE: [get_default_database_mode()], - }, - ] - + [ - { - DatasetConstants.TEST: "long_running.cpp", - DatasetConstants.OPTIONS: [ - "--vertex-count", - "10000", - "--edge-count", - "40000", - "--max-time", - "5", - "--verify", - "60", - ], - DatasetConstants.TIMEOUT: 16, - DatasetConstants.MODE: [get_default_database_mode()], - }, - ] - * 6 - + [ - { - DatasetConstants.TEST: "long_running.cpp", - DatasetConstants.OPTIONS: [ - "--vertex-count", - "200000", - "--edge-count", - "1000000", - "--max-time", - "480", - "--verify", - "300", - "--stats-file", - STATS_FILE, - ], - DatasetConstants.TIMEOUT: 500, - DatasetConstants.MODE: [get_default_database_mode()], - }, - ] -)