From 05e4985fd5dc311823e6de2585bde28d526d47a7 Mon Sep 17 00:00:00 2001 From: dlozic Date: Mon, 29 Apr 2019 16:38:00 +0200 Subject: [PATCH] Replace Client with HAClient in feature benchmark Reviewers: msantl, ipaljak, mferencevic Reviewed By: msantl, mferencevic Subscribers: mferencevic, teon.banek, pullbot Differential Revision: https://phabricator.memgraph.io/D1969 --- tests/feature_benchmark/ha/read/benchmark.cpp | 61 +++++-------------- .../feature_benchmark/ha/write/benchmark.cpp | 52 +++------------- 2 files changed, 26 insertions(+), 87 deletions(-) diff --git a/tests/feature_benchmark/ha/read/benchmark.cpp b/tests/feature_benchmark/ha/read/benchmark.cpp index c137e73d3..877f2a33b 100644 --- a/tests/feature_benchmark/ha/read/benchmark.cpp +++ b/tests/feature_benchmark/ha/read/benchmark.cpp @@ -8,7 +8,7 @@ #include #include -#include "communication/bolt/client.hpp" +#include "communication/bolt/ha_client.hpp" #include "io/network/endpoint.hpp" #include "io/network/utils.hpp" #include "utils/flag_validation.hpp" @@ -29,38 +29,6 @@ DEFINE_string(output_file, "", "Output file where the results should be."); DEFINE_int32(nodes, 1000, "Number of nodes in DB"); DEFINE_int32(edges, 5000, "Number of edges in DB"); -std::optional GetLeaderEndpoint() { - for (int retry = 0; retry < 10; ++retry) { - for (int i = 0; i < FLAGS_cluster_size; ++i) { - try { - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::Client client(&context); - - uint16_t port = FLAGS_port + i; - io::network::Endpoint endpoint{FLAGS_address, port}; - - client.Connect(endpoint, FLAGS_username, FLAGS_password); - client.Execute("MATCH (n) RETURN n", {}); - client.Close(); - - // If we succeeded with the above query, we found the current leader. - return std::make_optional(endpoint); - - } catch (const communication::bolt::ClientQueryException &) { - // This one is not the leader, continue. - continue; - } catch (const communication::bolt::ClientFatalException &) { - // This one seems to be down, continue. - continue; - } - } - LOG(INFO) << "Couldn't find Raft cluster leader, retrying..."; - std::this_thread::sleep_for(1s); - } - - return std::nullopt; -} - int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::SetUsageMessage("Memgraph HA read benchmark client"); @@ -68,16 +36,21 @@ int main(int argc, char **argv) { std::atomic query_counter{0}; - auto leader_endpoint = GetLeaderEndpoint(); - if (!leader_endpoint) { - LOG(ERROR) << "Couldn't find Raft cluster leader!"; - return 1; + std::vector endpoints; + for (int i = 0; i < FLAGS_cluster_size; ++i) { + uint16_t port = FLAGS_port + i; + io::network::Endpoint endpoint{FLAGS_address, port}; + endpoints.push_back(endpoint); } - // populate the db (random graph with given number of nodes and edges) + // Populate the db (random graph with given number of nodes and edges) + // Raft reelection constants are between 300ms to 500ms, so we + // use 1000ms to avoid using up all retries unnecessarily during reelection. + std::chrono::milliseconds retry_delay(1000); communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::Client client(&context); - client.Connect(*leader_endpoint, FLAGS_username, FLAGS_password); + communication::bolt::HAClient client(endpoints, &context, FLAGS_username, + FLAGS_password, 10, retry_delay); + for (int i = 0; i < FLAGS_nodes; ++i) { client.Execute("CREATE (:Node {id:" + std::to_string(i) + "})", {}); } @@ -101,12 +74,12 @@ int main(int argc, char **argv) { thread_duration.resize(num_threads); for (int i = 0; i < num_threads; ++i) { - threads.emplace_back([i, endpoint = *leader_endpoint, &query_counter, + threads.emplace_back([i, &endpoints, &query_counter, retry_delay, &local_duration = thread_duration[i]]() { utils::ThreadSetName(fmt::format("BenchWriter{}", i)); communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::Client client(&context); - client.Connect(endpoint, FLAGS_username, FLAGS_password); + communication::bolt::HAClient client(endpoints, &context, FLAGS_username, + FLAGS_password, 10, retry_delay); auto seed = std::chrono::high_resolution_clock::now().time_since_epoch().count(); @@ -131,8 +104,6 @@ int main(int argc, char **argv) { break; } } - - client.Close(); }); } diff --git a/tests/feature_benchmark/ha/write/benchmark.cpp b/tests/feature_benchmark/ha/write/benchmark.cpp index 1045b6525..55063f0e7 100644 --- a/tests/feature_benchmark/ha/write/benchmark.cpp +++ b/tests/feature_benchmark/ha/write/benchmark.cpp @@ -7,7 +7,7 @@ #include #include -#include "communication/bolt/client.hpp" +#include "communication/bolt/ha_client.hpp" #include "io/network/endpoint.hpp" #include "io/network/utils.hpp" #include "utils/flag_validation.hpp" @@ -26,38 +26,6 @@ DEFINE_double(duration, 10.0, "How long should the client perform writes (seconds)"); DEFINE_string(output_file, "", "Output file where the results should be."); -std::optional GetLeaderEndpoint() { - for (int retry = 0; retry < 10; ++retry) { - for (int i = 0; i < FLAGS_cluster_size; ++i) { - try { - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::Client client(&context); - - uint16_t port = FLAGS_port + i; - io::network::Endpoint endpoint{FLAGS_address, port}; - - client.Connect(endpoint, FLAGS_username, FLAGS_password); - client.Execute("MATCH (n) RETURN n", {}); - client.Close(); - - // If we succeeded with the above query, we found the current leader. - return std::make_optional(endpoint); - - } catch (const communication::bolt::ClientQueryException &) { - // This one is not the leader, continue. - continue; - } catch (const communication::bolt::ClientFatalException &) { - // This one seems to be down, continue. - continue; - } - } - LOG(INFO) << "Couldn't find Raft cluster leader, retrying..."; - std::this_thread::sleep_for(1s); - } - - return std::nullopt; -} - int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::SetUsageMessage("Memgraph HA write benchmark client"); @@ -65,10 +33,11 @@ int main(int argc, char **argv) { std::atomic query_counter{0}; - auto leader_endpoint = GetLeaderEndpoint(); - if (!leader_endpoint) { - LOG(ERROR) << "Couldn't find Raft cluster leader!"; - return 1; + std::vector endpoints; + for (int i = 0; i < FLAGS_cluster_size; ++i) { + uint16_t port = FLAGS_port + i; + io::network::Endpoint endpoint{FLAGS_address, port}; + endpoints.push_back(endpoint); } const int num_threads = std::thread::hardware_concurrency(); @@ -78,12 +47,13 @@ int main(int argc, char **argv) { thread_duration.resize(num_threads); for (int i = 0; i < num_threads; ++i) { - threads.emplace_back([i, endpoint = *leader_endpoint, &query_counter, + threads.emplace_back([i, endpoints = endpoints, &query_counter, &local_duration = thread_duration[i]]() { utils::ThreadSetName(fmt::format("BenchWriter{}", i)); communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::Client client(&context); - client.Connect(endpoint, FLAGS_username, FLAGS_password); + std::chrono::milliseconds retry_delay(1000); + communication::bolt::HAClient client(endpoints, &context, FLAGS_username, + FLAGS_password, 10, retry_delay); utils::Timer t; while (true) { @@ -101,8 +71,6 @@ int main(int argc, char **argv) { break; } } - - client.Close(); }); }