Make manual HA client use the new Bolt client

Reviewers: msantl, ipaljak

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1976
This commit is contained in:
Matej Ferencevic 2019-04-25 14:12:42 +02:00
parent 6182312e3d
commit c3cfe876c9
2 changed files with 53 additions and 78 deletions

View File

@ -28,7 +28,7 @@ class HaLogCompactionTest(HaTestBase):
def execute_step(self, query):
client = subprocess.Popen(
[self.tester_binary, "--cluster_size", str(self.cluster_size)],
stdin=subprocess.PIPE)
stdin=subprocess.PIPE, stdout=subprocess.DEVNULL)
try:
client.communicate(input=bytes(query, "UTF-8"), timeout=30)
@ -93,7 +93,7 @@ if __name__ == "__main__":
args = parser.parse_args()
for cluster_size in [3, 5]:
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
print("\033[1;36m~~ Executing test with cluster size: %d ~~\033[0m" % (cluster_size))
HaLogCompactionTest(
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
print("\033[1;32m~~ The test finished successfully ~~\033[0m")

View File

@ -6,7 +6,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/client.hpp"
#include "communication/bolt/ha_client.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/utils.hpp"
#include "utils/timer.hpp"
@ -17,90 +17,65 @@ DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
DEFINE_bool(print_records, true,
"Set to false to disable printing of records.");
DEFINE_int32(num_retries, 3,
"Number of retries for each operation (execute/connect).");
DEFINE_int32(retry_delay_ms, 1000, "Delay before retrying in ms.");
using namespace std::chrono_literals;
void Execute(const std::vector<std::string> &queries) {
communication::ClientContext context(FLAGS_use_ssl);
std::optional<communication::bolt::Client> client;
communication::bolt::QueryData result;
for (size_t k = 0; k < queries.size();) {
if (!client) {
// Find the leader by executing query on all machines until one responds
// with success.
for (int retry = 0; !client && retry < 10; ++retry) {
for (int i = 0; !client && i < FLAGS_cluster_size; ++i) {
try {
client.emplace(&context);
uint16_t port = FLAGS_port + i;
io::network::Endpoint endpoint{FLAGS_address, port};
client->Connect(endpoint, FLAGS_username, FLAGS_password);
result = client->Execute(queries[k], {});
} catch (const communication::bolt::ClientQueryException &) {
// This one is not the leader, continue.
client = std::nullopt;
continue;
} catch (const communication::bolt::ClientFatalException &) {
// This one seems to be down, continue.
client = std::nullopt;
continue;
}
}
if (!client) {
LOG(INFO) << "Couldn't find Raft cluster leader, retrying...";
std::this_thread::sleep_for(1s);
}
}
if (!client) {
LOG(ERROR) << "Couldn't find Raft cluster leader.";
return;
}
} else {
// Try reusing the previous client.
try {
result = client->Execute(queries[k], {});
} catch (const communication::bolt::ClientQueryException &) {
client = std::nullopt;
continue;
} catch (const communication::bolt::ClientFatalException &e) {
client = std::nullopt;
continue;
}
}
if (result.records.size() > 0) {
std::cout << "Results: " << std::endl;
for (auto &record : result.records) {
std::cout << record << std::endl;
}
}
k += 1;
}
return;
}
// NOLINTNEXTLINE(bugprone-exception-escape)
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
communication::Init();
std::vector<std::string> queries;
std::string line;
while (std::getline(std::cin, line)) {
queries.emplace_back(std::move(line));
std::vector<io::network::Endpoint> endpoints;
endpoints.reserve(FLAGS_cluster_size);
for (int i = 0; i < FLAGS_cluster_size; ++i) {
endpoints.push_back({FLAGS_address, static_cast<uint16_t>(FLAGS_port + i)});
}
Execute(queries);
communication::ClientContext context(FLAGS_use_ssl);
communication::bolt::HAClient client(
endpoints, &context, FLAGS_username, FLAGS_password, FLAGS_num_retries,
std::chrono::milliseconds(FLAGS_retry_delay_ms));
while (true) {
std::string s;
std::getline(std::cin, s);
if (s == "") {
break;
}
try {
utils::Timer t;
auto ret = client.Execute(s, {});
auto elapsed = t.Elapsed().count();
std::cout << "Wall time:\n " << elapsed << std::endl;
std::cout << "Fields:" << std::endl;
for (auto &field : ret.fields) {
std::cout << " " << field << std::endl;
}
if (FLAGS_print_records) {
std::cout << "Records:" << std::endl;
for (int i = 0; i < static_cast<int>(ret.records.size()); ++i) {
std::cout << " " << i << std::endl;
for (auto &value : ret.records[i]) {
std::cout << " " << value << std::endl;
}
}
}
std::cout << "Metadata:" << std::endl;
for (auto &data : ret.metadata) {
std::cout << " " << data.first << " : " << data.second << std::endl;
}
} catch (const communication::bolt::ClientQueryException &e) {
std::cout << "Client received exception: " << e.what() << std::endl;
}
}
return 0;
}