diff --git a/tests/integration/ha/log_compaction/runner.py b/tests/integration/ha/log_compaction/runner.py index 7788cbd21..f2a31f0c8 100755 --- a/tests/integration/ha/log_compaction/runner.py +++ b/tests/integration/ha/log_compaction/runner.py @@ -28,7 +28,7 @@ class HaLogCompactionTest(HaTestBase): def execute_step(self, query): client = subprocess.Popen( [self.tester_binary, "--cluster_size", str(self.cluster_size)], - stdin=subprocess.PIPE) + stdin=subprocess.PIPE, stdout=subprocess.DEVNULL) try: client.communicate(input=bytes(query, "UTF-8"), timeout=30) @@ -93,7 +93,7 @@ if __name__ == "__main__": args = parser.parse_args() for cluster_size in [3, 5]: - print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) + print("\033[1;36m~~ Executing test with cluster size: %d ~~\033[0m" % (cluster_size)) HaLogCompactionTest( args.memgraph, tester_binary, args.raft_config_file, cluster_size) print("\033[1;32m~~ The test finished successfully ~~\033[0m") diff --git a/tests/manual/ha_client.cpp b/tests/manual/ha_client.cpp index 819ee697e..b38e3d68c 100644 --- a/tests/manual/ha_client.cpp +++ b/tests/manual/ha_client.cpp @@ -6,7 +6,7 @@ #include <gflags/gflags.h> #include <glog/logging.h> -#include "communication/bolt/client.hpp" +#include "communication/bolt/ha_client.hpp" #include "io/network/endpoint.hpp" #include "io/network/utils.hpp" #include "utils/timer.hpp" @@ -17,90 +17,65 @@ DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); DEFINE_string(username, "", "Username for the database"); DEFINE_string(password, "", "Password for the database"); DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); +DEFINE_bool(print_records, true, + "Set to false to disable printing of records."); +DEFINE_int32(num_retries, 3, + "Number of retries for each operation (execute/connect)."); +DEFINE_int32(retry_delay_ms, 1000, "Delay before retrying in ms."); -using namespace std::chrono_literals; - -void Execute(const std::vector<std::string> &queries) { - communication::ClientContext context(FLAGS_use_ssl); - std::optional<communication::bolt::Client> client; - communication::bolt::QueryData result; - - for (size_t k = 0; k < queries.size();) { - if (!client) { - // Find the leader by executing query on all machines until one responds - // with success. - for (int retry = 0; !client && retry < 10; ++retry) { - for (int i = 0; !client && i < FLAGS_cluster_size; ++i) { - try { - client.emplace(&context); - - uint16_t port = FLAGS_port + i; - io::network::Endpoint endpoint{FLAGS_address, port}; - - client->Connect(endpoint, FLAGS_username, FLAGS_password); - result = client->Execute(queries[k], {}); - } catch (const communication::bolt::ClientQueryException &) { - // This one is not the leader, continue. - client = std::nullopt; - continue; - } catch (const communication::bolt::ClientFatalException &) { - // This one seems to be down, continue. - client = std::nullopt; - continue; - } - } - - if (!client) { - LOG(INFO) << "Couldn't find Raft cluster leader, retrying..."; - std::this_thread::sleep_for(1s); - } - } - - if (!client) { - LOG(ERROR) << "Couldn't find Raft cluster leader."; - return; - } - - } else { - // Try reusing the previous client. - try { - result = client->Execute(queries[k], {}); - } catch (const communication::bolt::ClientQueryException &) { - client = std::nullopt; - continue; - } catch (const communication::bolt::ClientFatalException &e) { - client = std::nullopt; - continue; - } - } - - if (result.records.size() > 0) { - std::cout << "Results: " << std::endl; - for (auto &record : result.records) { - std::cout << record << std::endl; - } - } - - k += 1; - } - - return; -} - +// NOLINTNEXTLINE(bugprone-exception-escape) int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); communication::Init(); - std::vector<std::string> queries; - - std::string line; - while (std::getline(std::cin, line)) { - queries.emplace_back(std::move(line)); + std::vector<io::network::Endpoint> endpoints; + endpoints.reserve(FLAGS_cluster_size); + for (int i = 0; i < FLAGS_cluster_size; ++i) { + endpoints.push_back({FLAGS_address, static_cast<uint16_t>(FLAGS_port + i)}); } - Execute(queries); + communication::ClientContext context(FLAGS_use_ssl); + communication::bolt::HAClient client( + endpoints, &context, FLAGS_username, FLAGS_password, FLAGS_num_retries, + std::chrono::milliseconds(FLAGS_retry_delay_ms)); + + while (true) { + std::string s; + std::getline(std::cin, s); + if (s == "") { + break; + } + try { + utils::Timer t; + auto ret = client.Execute(s, {}); + auto elapsed = t.Elapsed().count(); + std::cout << "Wall time:\n " << elapsed << std::endl; + + std::cout << "Fields:" << std::endl; + for (auto &field : ret.fields) { + std::cout << " " << field << std::endl; + } + + if (FLAGS_print_records) { + std::cout << "Records:" << std::endl; + for (int i = 0; i < static_cast<int>(ret.records.size()); ++i) { + std::cout << " " << i << std::endl; + for (auto &value : ret.records[i]) { + std::cout << " " << value << std::endl; + } + } + } + + std::cout << "Metadata:" << std::endl; + for (auto &data : ret.metadata) { + std::cout << " " << data.first << " : " << data.second << std::endl; + } + } catch (const communication::bolt::ClientQueryException &e) { + std::cout << "Client received exception: " << e.what() << std::endl; + } + } return 0; }