Replace Client with HAClient in feature benchmark
Reviewers: msantl, ipaljak, mferencevic Reviewed By: msantl, mferencevic Subscribers: mferencevic, teon.banek, pullbot Differential Revision: https://phabricator.memgraph.io/D1969
This commit is contained in:
parent
9307cdc7ac
commit
05e4985fd5
@ -8,7 +8,7 @@
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
@ -29,38 +29,6 @@ DEFINE_string(output_file, "", "Output file where the results should be.");
|
||||
DEFINE_int32(nodes, 1000, "Number of nodes in DB");
|
||||
DEFINE_int32(edges, 5000, "Number of edges in DB");
|
||||
|
||||
std::optional<io::network::Endpoint> GetLeaderEndpoint() {
|
||||
for (int retry = 0; retry < 10; ++retry) {
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i) {
|
||||
try {
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::Client client(&context);
|
||||
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
|
||||
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
||||
client.Execute("MATCH (n) RETURN n", {});
|
||||
client.Close();
|
||||
|
||||
// If we succeeded with the above query, we found the current leader.
|
||||
return std::make_optional(endpoint);
|
||||
|
||||
} catch (const communication::bolt::ClientQueryException &) {
|
||||
// This one is not the leader, continue.
|
||||
continue;
|
||||
} catch (const communication::bolt::ClientFatalException &) {
|
||||
// This one seems to be down, continue.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "Couldn't find Raft cluster leader, retrying...";
|
||||
std::this_thread::sleep_for(1s);
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::SetUsageMessage("Memgraph HA read benchmark client");
|
||||
@ -68,16 +36,21 @@ int main(int argc, char **argv) {
|
||||
|
||||
std::atomic<int64_t> query_counter{0};
|
||||
|
||||
auto leader_endpoint = GetLeaderEndpoint();
|
||||
if (!leader_endpoint) {
|
||||
LOG(ERROR) << "Couldn't find Raft cluster leader!";
|
||||
return 1;
|
||||
std::vector<io::network::Endpoint> endpoints;
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i) {
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
endpoints.push_back(endpoint);
|
||||
}
|
||||
|
||||
// populate the db (random graph with given number of nodes and edges)
|
||||
// Populate the db (random graph with given number of nodes and edges)
|
||||
// Raft reelection constants are between 300ms to 500ms, so we
|
||||
// use 1000ms to avoid using up all retries unnecessarily during reelection.
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::Client client(&context);
|
||||
client.Connect(*leader_endpoint, FLAGS_username, FLAGS_password);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 10, retry_delay);
|
||||
|
||||
for (int i = 0; i < FLAGS_nodes; ++i) {
|
||||
client.Execute("CREATE (:Node {id:" + std::to_string(i) + "})", {});
|
||||
}
|
||||
@ -101,12 +74,12 @@ int main(int argc, char **argv) {
|
||||
thread_duration.resize(num_threads);
|
||||
|
||||
for (int i = 0; i < num_threads; ++i) {
|
||||
threads.emplace_back([i, endpoint = *leader_endpoint, &query_counter,
|
||||
threads.emplace_back([i, &endpoints, &query_counter, retry_delay,
|
||||
&local_duration = thread_duration[i]]() {
|
||||
utils::ThreadSetName(fmt::format("BenchWriter{}", i));
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::Client client(&context);
|
||||
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 10, retry_delay);
|
||||
|
||||
auto seed =
|
||||
std::chrono::high_resolution_clock::now().time_since_epoch().count();
|
||||
@ -131,8 +104,6 @@ int main(int argc, char **argv) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
client.Close();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -7,7 +7,7 @@
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
@ -26,38 +26,6 @@ DEFINE_double(duration, 10.0,
|
||||
"How long should the client perform writes (seconds)");
|
||||
DEFINE_string(output_file, "", "Output file where the results should be.");
|
||||
|
||||
std::optional<io::network::Endpoint> GetLeaderEndpoint() {
|
||||
for (int retry = 0; retry < 10; ++retry) {
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i) {
|
||||
try {
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::Client client(&context);
|
||||
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
|
||||
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
||||
client.Execute("MATCH (n) RETURN n", {});
|
||||
client.Close();
|
||||
|
||||
// If we succeeded with the above query, we found the current leader.
|
||||
return std::make_optional(endpoint);
|
||||
|
||||
} catch (const communication::bolt::ClientQueryException &) {
|
||||
// This one is not the leader, continue.
|
||||
continue;
|
||||
} catch (const communication::bolt::ClientFatalException &) {
|
||||
// This one seems to be down, continue.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "Couldn't find Raft cluster leader, retrying...";
|
||||
std::this_thread::sleep_for(1s);
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::SetUsageMessage("Memgraph HA write benchmark client");
|
||||
@ -65,10 +33,11 @@ int main(int argc, char **argv) {
|
||||
|
||||
std::atomic<int64_t> query_counter{0};
|
||||
|
||||
auto leader_endpoint = GetLeaderEndpoint();
|
||||
if (!leader_endpoint) {
|
||||
LOG(ERROR) << "Couldn't find Raft cluster leader!";
|
||||
return 1;
|
||||
std::vector<io::network::Endpoint> endpoints;
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i) {
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
endpoints.push_back(endpoint);
|
||||
}
|
||||
|
||||
const int num_threads = std::thread::hardware_concurrency();
|
||||
@ -78,12 +47,13 @@ int main(int argc, char **argv) {
|
||||
thread_duration.resize(num_threads);
|
||||
|
||||
for (int i = 0; i < num_threads; ++i) {
|
||||
threads.emplace_back([i, endpoint = *leader_endpoint, &query_counter,
|
||||
threads.emplace_back([i, endpoints = endpoints, &query_counter,
|
||||
&local_duration = thread_duration[i]]() {
|
||||
utils::ThreadSetName(fmt::format("BenchWriter{}", i));
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::Client client(&context);
|
||||
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 10, retry_delay);
|
||||
|
||||
utils::Timer t;
|
||||
while (true) {
|
||||
@ -101,8 +71,6 @@ int main(int argc, char **argv) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
client.Close();
|
||||
});
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user