2023-02-18 00:54:05 +08:00
|
|
|
// Copyright 2023 Memgraph Ltd.
|
2021-10-26 14:53:56 +08:00
|
|
|
//
|
|
|
|
// Use of this software is governed by the Business Source License
|
|
|
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
|
|
|
// License, and you may not use this file except in compliance with the Business Source License.
|
|
|
|
//
|
|
|
|
// As of the Change Date specified in that file, in accordance with
|
|
|
|
// the Business Source License, use of this software will be governed
|
|
|
|
// by the Apache License, Version 2.0, included in the file
|
|
|
|
// licenses/APL.txt.
|
|
|
|
|
2020-09-23 00:55:28 +08:00
|
|
|
#include <algorithm>
|
|
|
|
#include <atomic>
|
|
|
|
#include <chrono>
|
|
|
|
#include <filesystem>
|
|
|
|
#include <fstream>
|
|
|
|
#include <limits>
|
|
|
|
#include <map>
|
2023-02-18 00:54:05 +08:00
|
|
|
#include <numeric>
|
|
|
|
#include <ostream>
|
2020-09-23 00:55:28 +08:00
|
|
|
#include <string>
|
|
|
|
#include <thread>
|
|
|
|
#include <vector>
|
|
|
|
|
|
|
|
#include <gflags/gflags.h>
|
2023-02-18 00:54:05 +08:00
|
|
|
#include <math.h>
|
2020-09-23 00:55:28 +08:00
|
|
|
#include <json/json.hpp>
|
|
|
|
|
|
|
|
#include "communication/bolt/client.hpp"
|
|
|
|
#include "communication/bolt/v1/value.hpp"
|
2020-10-20 18:55:13 +08:00
|
|
|
#include "communication/init.hpp"
|
2023-02-18 00:54:05 +08:00
|
|
|
#include "spdlog/formatter.h"
|
|
|
|
#include "spdlog/spdlog.h"
|
2020-09-23 00:55:28 +08:00
|
|
|
#include "utils/exceptions.hpp"
|
2023-02-18 00:54:05 +08:00
|
|
|
#include "utils/logging.hpp"
|
2020-09-23 00:55:28 +08:00
|
|
|
#include "utils/string.hpp"
|
|
|
|
#include "utils/timer.hpp"
|
|
|
|
|
|
|
|
DEFINE_string(address, "127.0.0.1", "Server address.");
|
|
|
|
DEFINE_int32(port, 7687, "Server port.");
|
|
|
|
DEFINE_string(username, "", "Username for the database.");
|
|
|
|
DEFINE_string(password, "", "Password for the database.");
|
|
|
|
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
|
|
|
|
|
|
|
DEFINE_uint64(num_workers, 1,
|
|
|
|
"Number of workers that should be used to concurrently execute "
|
|
|
|
"the supplied queries.");
|
|
|
|
DEFINE_uint64(max_retries, 50, "Maximum number of retries for each query.");
|
2021-02-18 22:32:43 +08:00
|
|
|
DEFINE_bool(queries_json, false,
|
|
|
|
"Set to true to load all queries as as single JSON encoded list. Each item "
|
|
|
|
"in the list should contain another list whose first element is the query "
|
|
|
|
"that should be executed and the second element should be a dictionary of "
|
|
|
|
"query parameters for that query.");
|
2020-09-23 00:55:28 +08:00
|
|
|
|
|
|
|
DEFINE_string(input, "", "Input file. By default stdin is used.");
|
|
|
|
DEFINE_string(output, "", "Output file. By default stdout is used.");
|
2023-02-18 00:54:05 +08:00
|
|
|
DEFINE_bool(validation, false,
|
|
|
|
"Set to true to run client in validation mode."
|
|
|
|
"Validation mode works for singe query and returns results for validation"
|
|
|
|
"with metadata");
|
2023-03-19 03:18:58 +08:00
|
|
|
DEFINE_int64(time_dependent_execution, 0,
|
|
|
|
"Time-dependent executions execute the queries for a specified number of seconds."
|
|
|
|
"If all queries are executed, and there is still time, queries are rerun again."
|
|
|
|
"If the time runs out, the client is done with the job and returning results.");
|
2020-09-23 00:55:28 +08:00
|
|
|
|
2022-02-22 20:33:45 +08:00
|
|
|
std::pair<std::map<std::string, memgraph::communication::bolt::Value>, uint64_t> ExecuteNTimesTillSuccess(
|
|
|
|
memgraph::communication::bolt::Client *client, const std::string &query,
|
|
|
|
const std::map<std::string, memgraph::communication::bolt::Value> ¶ms, int max_attempts) {
|
2020-09-23 00:55:28 +08:00
|
|
|
for (uint64_t i = 0; i < max_attempts; ++i) {
|
|
|
|
try {
|
|
|
|
auto ret = client->Execute(query, params);
|
2023-02-18 00:54:05 +08:00
|
|
|
|
2020-09-23 00:55:28 +08:00
|
|
|
return {std::move(ret.metadata), i};
|
2022-02-22 20:33:45 +08:00
|
|
|
} catch (const memgraph::utils::BasicException &e) {
|
2020-09-23 00:55:28 +08:00
|
|
|
if (i == max_attempts - 1) {
|
2021-02-18 22:32:43 +08:00
|
|
|
LOG_FATAL("Could not execute query '{}' {} times! Error message: {}", query, max_attempts, e.what());
|
2020-09-23 00:55:28 +08:00
|
|
|
} else {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-01-21 22:47:56 +08:00
|
|
|
LOG_FATAL("Could not execute query '{}' {} times!", query, max_attempts);
|
2020-09-23 00:55:28 +08:00
|
|
|
}
|
|
|
|
|
2023-02-18 00:54:05 +08:00
|
|
|
// Validation returns results and metadata
|
|
|
|
std::pair<std::map<std::string, memgraph::communication::bolt::Value>,
|
|
|
|
std::vector<std::vector<memgraph::communication::bolt::Value>>>
|
|
|
|
ExecuteValidationNTimesTillSuccess(memgraph::communication::bolt::Client *client, const std::string &query,
|
|
|
|
const std::map<std::string, memgraph::communication::bolt::Value> ¶ms,
|
|
|
|
int max_attempts) {
|
|
|
|
for (uint64_t i = 0; i < max_attempts; ++i) {
|
|
|
|
try {
|
|
|
|
auto ret = client->Execute(query, params);
|
|
|
|
|
|
|
|
return {std::move(ret.metadata), std::move(ret.records)};
|
|
|
|
} catch (const memgraph::utils::BasicException &e) {
|
|
|
|
if (i == max_attempts - 1) {
|
|
|
|
LOG_FATAL("Could not execute query '{}' {} times! Error message: {}", query, max_attempts, e.what());
|
|
|
|
} else {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
LOG_FATAL("Could not execute query '{}' {} times!", query, max_attempts);
|
|
|
|
}
|
|
|
|
|
2022-02-22 20:33:45 +08:00
|
|
|
memgraph::communication::bolt::Value JsonToBoltValue(const nlohmann::json &data) {
|
2020-09-23 00:55:28 +08:00
|
|
|
switch (data.type()) {
|
|
|
|
case nlohmann::json::value_t::null:
|
|
|
|
return {};
|
|
|
|
case nlohmann::json::value_t::boolean:
|
|
|
|
return {data.get<bool>()};
|
|
|
|
case nlohmann::json::value_t::string:
|
|
|
|
return {data.get<std::string>()};
|
|
|
|
case nlohmann::json::value_t::number_integer:
|
|
|
|
return {data.get<int64_t>()};
|
|
|
|
case nlohmann::json::value_t::number_unsigned:
|
|
|
|
return {static_cast<int64_t>(data.get<uint64_t>())};
|
|
|
|
case nlohmann::json::value_t::number_float:
|
|
|
|
return {data.get<double>()};
|
|
|
|
case nlohmann::json::value_t::array: {
|
2022-02-22 20:33:45 +08:00
|
|
|
std::vector<memgraph::communication::bolt::Value> vec;
|
2020-09-23 00:55:28 +08:00
|
|
|
vec.reserve(data.size());
|
|
|
|
for (const auto &item : data.get<nlohmann::json::array_t>()) {
|
|
|
|
vec.emplace_back(JsonToBoltValue(item));
|
|
|
|
}
|
|
|
|
return {std::move(vec)};
|
|
|
|
}
|
|
|
|
case nlohmann::json::value_t::object: {
|
2022-02-22 20:33:45 +08:00
|
|
|
std::map<std::string, memgraph::communication::bolt::Value> map;
|
2020-09-23 00:55:28 +08:00
|
|
|
for (const auto &item : data.get<nlohmann::json::object_t>()) {
|
|
|
|
map.emplace(item.first, JsonToBoltValue(item.second));
|
|
|
|
}
|
|
|
|
return {std::move(map)};
|
|
|
|
}
|
2020-11-17 17:07:31 +08:00
|
|
|
case nlohmann::json::value_t::binary:
|
2020-09-23 00:55:28 +08:00
|
|
|
case nlohmann::json::value_t::discarded:
|
2021-01-21 22:47:56 +08:00
|
|
|
LOG_FATAL("Unexpected JSON type!");
|
2020-09-23 00:55:28 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
class Metadata final {
|
|
|
|
private:
|
|
|
|
struct Record {
|
|
|
|
uint64_t count{0};
|
|
|
|
double average{0.0};
|
|
|
|
double minimum{std::numeric_limits<double>::infinity()};
|
|
|
|
double maximum{-std::numeric_limits<double>::infinity()};
|
|
|
|
};
|
|
|
|
|
|
|
|
public:
|
2022-02-22 20:33:45 +08:00
|
|
|
void Append(const std::map<std::string, memgraph::communication::bolt::Value> &values) {
|
2020-09-23 00:55:28 +08:00
|
|
|
for (const auto &item : values) {
|
|
|
|
if (!item.second.IsInt() && !item.second.IsDouble()) continue;
|
|
|
|
auto [it, emplaced] = storage_.emplace(item.first, Record());
|
|
|
|
auto &record = it->second;
|
|
|
|
double value = 0.0;
|
|
|
|
if (item.second.IsInt()) {
|
|
|
|
value = item.second.ValueInt();
|
|
|
|
} else {
|
|
|
|
value = item.second.ValueDouble();
|
|
|
|
}
|
|
|
|
++record.count;
|
|
|
|
record.average += value;
|
|
|
|
record.minimum = std::min(record.minimum, value);
|
|
|
|
record.maximum = std::max(record.maximum, value);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
nlohmann::json Export() {
|
|
|
|
nlohmann::json data = nlohmann::json::object();
|
|
|
|
for (const auto &item : storage_) {
|
|
|
|
nlohmann::json row = nlohmann::json::object();
|
|
|
|
row["average"] = item.second.average / item.second.count;
|
|
|
|
row["minimum"] = item.second.minimum;
|
|
|
|
row["maximum"] = item.second.maximum;
|
|
|
|
data[item.first] = row;
|
|
|
|
}
|
|
|
|
return data;
|
|
|
|
}
|
|
|
|
|
|
|
|
Metadata &operator+=(const Metadata &other) {
|
|
|
|
for (const auto &item : other.storage_) {
|
|
|
|
auto [it, emplaced] = storage_.emplace(item.first, Record());
|
|
|
|
auto &record = it->second;
|
|
|
|
record.count += item.second.count;
|
|
|
|
record.average += item.second.average;
|
|
|
|
record.minimum = std::min(record.minimum, item.second.minimum);
|
|
|
|
record.maximum = std::max(record.maximum, item.second.maximum);
|
|
|
|
}
|
|
|
|
return *this;
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
std::map<std::string, Record> storage_;
|
|
|
|
};
|
|
|
|
|
2023-02-18 00:54:05 +08:00
|
|
|
nlohmann::json LatencyStatistics(std::vector<std::vector<double>> &worker_query_latency) {
|
|
|
|
nlohmann::json statistics = nlohmann::json::object();
|
|
|
|
std::vector<double> query_latency;
|
|
|
|
for (int i = 0; i < FLAGS_num_workers; i++) {
|
|
|
|
for (auto &e : worker_query_latency[i]) {
|
|
|
|
query_latency.push_back(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
auto iterations = query_latency.size();
|
|
|
|
const int lower_bound = 10;
|
|
|
|
if (iterations > lower_bound) {
|
|
|
|
std::sort(query_latency.begin(), query_latency.end());
|
|
|
|
statistics["iterations"] = iterations;
|
|
|
|
statistics["min"] = query_latency.front();
|
|
|
|
statistics["max"] = query_latency.back();
|
|
|
|
statistics["mean"] = std::accumulate(query_latency.begin(), query_latency.end(), 0.0) / iterations;
|
|
|
|
statistics["p99"] = query_latency[floor(iterations * 0.99)];
|
|
|
|
statistics["p95"] = query_latency[floor(iterations * 0.95)];
|
|
|
|
statistics["p90"] = query_latency[floor(iterations * 0.90)];
|
|
|
|
statistics["p75"] = query_latency[floor(iterations * 0.75)];
|
|
|
|
statistics["p50"] = query_latency[floor(iterations * 0.50)];
|
|
|
|
|
|
|
|
} else {
|
|
|
|
spdlog::info("To few iterations to calculate latency values!");
|
|
|
|
statistics["iterations"] = iterations;
|
|
|
|
}
|
|
|
|
return statistics;
|
|
|
|
}
|
|
|
|
|
2023-03-19 03:18:58 +08:00
|
|
|
void ExecuteTimeDependentWorkload(
|
|
|
|
const std::vector<std::pair<std::string, std::map<std::string, memgraph::communication::bolt::Value>>> &queries,
|
|
|
|
std::ostream *stream) {
|
|
|
|
std::vector<std::thread> threads;
|
|
|
|
threads.reserve(FLAGS_num_workers);
|
|
|
|
|
|
|
|
std::vector<uint64_t> worker_retries(FLAGS_num_workers, 0);
|
|
|
|
std::vector<Metadata> worker_metadata(FLAGS_num_workers, Metadata());
|
|
|
|
std::vector<double> worker_duration(FLAGS_num_workers, 0.0);
|
|
|
|
std::vector<std::vector<double>> worker_query_durations(FLAGS_num_workers);
|
|
|
|
|
|
|
|
// Start workers and execute queries.
|
|
|
|
auto size = queries.size();
|
|
|
|
std::atomic<bool> run(false);
|
|
|
|
std::atomic<uint64_t> ready(0);
|
|
|
|
std::atomic<uint64_t> position(0);
|
|
|
|
std::atomic<bool> start_workload_timer(false);
|
|
|
|
|
|
|
|
std::chrono::time_point<std::chrono::steady_clock> workload_start;
|
|
|
|
std::chrono::duration<double> time_limit = std::chrono::seconds(FLAGS_time_dependent_execution);
|
|
|
|
for (int worker = 0; worker < FLAGS_num_workers; ++worker) {
|
|
|
|
threads.push_back(std::thread([&, worker]() {
|
|
|
|
memgraph::io::network::Endpoint endpoint(FLAGS_address, FLAGS_port);
|
|
|
|
memgraph::communication::ClientContext context(FLAGS_use_ssl);
|
|
|
|
memgraph::communication::bolt::Client client(context);
|
|
|
|
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
|
|
|
|
|
|
|
ready.fetch_add(1, std::memory_order_acq_rel);
|
|
|
|
while (!run.load(std::memory_order_acq_rel))
|
|
|
|
;
|
|
|
|
auto &retries = worker_retries[worker];
|
|
|
|
auto &metadata = worker_metadata[worker];
|
|
|
|
auto &duration = worker_duration[worker];
|
|
|
|
auto &query_duration = worker_query_durations[worker];
|
|
|
|
|
|
|
|
// After all threads have been initialised, start the workload timer
|
|
|
|
if (!start_workload_timer.load()) {
|
|
|
|
workload_start = std::chrono::steady_clock::now();
|
|
|
|
start_workload_timer.store(true);
|
|
|
|
}
|
|
|
|
|
|
|
|
memgraph::utils::Timer worker_timer;
|
|
|
|
while (std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() -
|
|
|
|
workload_start) < time_limit) {
|
|
|
|
auto pos = position.fetch_add(1, std::memory_order_acq_rel);
|
|
|
|
if (pos >= size) {
|
|
|
|
/// Get back to inital position
|
|
|
|
position.store(0, std::memory_order_acq_rel);
|
|
|
|
pos = position.fetch_add(1, std::memory_order_acq_rel);
|
|
|
|
}
|
|
|
|
const auto &query = queries[pos];
|
|
|
|
memgraph::utils::Timer query_timer;
|
|
|
|
auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries);
|
|
|
|
query_duration.emplace_back(query_timer.Elapsed().count());
|
|
|
|
retries += ret.second;
|
|
|
|
metadata.Append(ret.first);
|
|
|
|
duration = worker_timer.Elapsed().count();
|
|
|
|
}
|
|
|
|
client.Close();
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Synchronize workers and collect runtime.
|
|
|
|
while (ready.load(std::memory_order_acq_rel) < FLAGS_num_workers)
|
|
|
|
;
|
2023-03-22 04:44:11 +08:00
|
|
|
|
2023-03-19 03:18:58 +08:00
|
|
|
run.store(true);
|
|
|
|
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
|
|
|
threads[i].join();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create and output summary.
|
|
|
|
Metadata final_metadata;
|
|
|
|
uint64_t final_retries = 0;
|
|
|
|
double final_duration = 0.0;
|
|
|
|
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
|
|
|
final_metadata += worker_metadata[i];
|
|
|
|
final_retries += worker_retries[i];
|
|
|
|
final_duration += worker_duration[i];
|
|
|
|
}
|
|
|
|
|
|
|
|
int total_iterations = 0;
|
|
|
|
std::for_each(worker_query_durations.begin(), worker_query_durations.end(),
|
|
|
|
[&](const std::vector<double> &v) { total_iterations += v.size(); });
|
|
|
|
|
|
|
|
final_duration /= FLAGS_num_workers;
|
|
|
|
double execution_delta = time_limit.count() / final_duration;
|
2023-03-22 04:44:11 +08:00
|
|
|
|
2023-03-19 03:18:58 +08:00
|
|
|
// This is adjusted throughput based on how much longer did workload execution time took.
|
|
|
|
double throughput = (total_iterations / final_duration) * execution_delta;
|
|
|
|
double raw_throughput = total_iterations / final_duration;
|
|
|
|
|
|
|
|
nlohmann::json summary = nlohmann::json::object();
|
|
|
|
summary["count"] = queries.size();
|
|
|
|
summary["duration"] = final_duration;
|
|
|
|
summary["time_limit"] = FLAGS_time_dependent_execution;
|
|
|
|
summary["queries_executed"] = total_iterations;
|
|
|
|
summary["throughput"] = throughput;
|
|
|
|
summary["raw_throughput"] = raw_throughput;
|
|
|
|
summary["latency_stats"] = LatencyStatistics(worker_query_durations);
|
|
|
|
summary["retries"] = final_retries;
|
|
|
|
summary["metadata"] = final_metadata.Export();
|
|
|
|
summary["num_workers"] = FLAGS_num_workers;
|
|
|
|
|
|
|
|
(*stream) << summary.dump() << std::endl;
|
|
|
|
}
|
|
|
|
|
|
|
|
void ExecuteWorkload(
|
2022-02-22 20:33:45 +08:00
|
|
|
const std::vector<std::pair<std::string, std::map<std::string, memgraph::communication::bolt::Value>>> &queries,
|
|
|
|
std::ostream *stream) {
|
2020-09-23 00:55:28 +08:00
|
|
|
std::vector<std::thread> threads;
|
|
|
|
threads.reserve(FLAGS_num_workers);
|
|
|
|
|
2023-04-03 22:29:21 +08:00
|
|
|
auto total_time_start = std::chrono::steady_clock::now();
|
|
|
|
|
2020-09-23 00:55:28 +08:00
|
|
|
std::vector<uint64_t> worker_retries(FLAGS_num_workers, 0);
|
|
|
|
std::vector<Metadata> worker_metadata(FLAGS_num_workers, Metadata());
|
|
|
|
std::vector<double> worker_duration(FLAGS_num_workers, 0.0);
|
2023-02-18 00:54:05 +08:00
|
|
|
std::vector<std::vector<double>> worker_query_durations(FLAGS_num_workers);
|
2020-09-23 00:55:28 +08:00
|
|
|
|
|
|
|
// Start workers and execute queries.
|
|
|
|
auto size = queries.size();
|
|
|
|
std::atomic<bool> run(false);
|
|
|
|
std::atomic<uint64_t> ready(0);
|
|
|
|
std::atomic<uint64_t> position(0);
|
|
|
|
for (int worker = 0; worker < FLAGS_num_workers; ++worker) {
|
|
|
|
threads.push_back(std::thread([&, worker]() {
|
2022-02-22 20:33:45 +08:00
|
|
|
memgraph::io::network::Endpoint endpoint(FLAGS_address, FLAGS_port);
|
|
|
|
memgraph::communication::ClientContext context(FLAGS_use_ssl);
|
2022-08-26 19:19:27 +08:00
|
|
|
memgraph::communication::bolt::Client client(context);
|
2020-09-23 00:55:28 +08:00
|
|
|
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
|
|
|
|
|
|
|
ready.fetch_add(1, std::memory_order_acq_rel);
|
|
|
|
while (!run.load(std::memory_order_acq_rel))
|
|
|
|
;
|
|
|
|
|
|
|
|
auto &retries = worker_retries[worker];
|
|
|
|
auto &metadata = worker_metadata[worker];
|
|
|
|
auto &duration = worker_duration[worker];
|
2023-02-18 00:54:05 +08:00
|
|
|
auto &query_duration = worker_query_durations[worker];
|
|
|
|
|
|
|
|
memgraph::utils::Timer worker_timer;
|
2020-09-23 00:55:28 +08:00
|
|
|
while (true) {
|
|
|
|
auto pos = position.fetch_add(1, std::memory_order_acq_rel);
|
|
|
|
if (pos >= size) break;
|
|
|
|
const auto &query = queries[pos];
|
2023-02-18 00:54:05 +08:00
|
|
|
memgraph::utils::Timer query_timer;
|
2021-02-18 22:32:43 +08:00
|
|
|
auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries);
|
2023-03-19 03:18:58 +08:00
|
|
|
query_duration.emplace_back(query_timer.Elapsed().count());
|
2020-09-23 00:55:28 +08:00
|
|
|
retries += ret.second;
|
|
|
|
metadata.Append(ret.first);
|
|
|
|
}
|
2023-02-18 00:54:05 +08:00
|
|
|
duration = worker_timer.Elapsed().count();
|
2020-09-23 00:55:28 +08:00
|
|
|
client.Close();
|
|
|
|
}));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Synchronize workers and collect runtime.
|
|
|
|
while (ready.load(std::memory_order_acq_rel) < FLAGS_num_workers)
|
|
|
|
;
|
|
|
|
run.store(true, std::memory_order_acq_rel);
|
2023-03-19 03:18:58 +08:00
|
|
|
|
2020-09-23 00:55:28 +08:00
|
|
|
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
|
|
|
threads[i].join();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create and output summary.
|
|
|
|
Metadata final_metadata;
|
|
|
|
uint64_t final_retries = 0;
|
|
|
|
double final_duration = 0.0;
|
|
|
|
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
|
|
|
final_metadata += worker_metadata[i];
|
|
|
|
final_retries += worker_retries[i];
|
|
|
|
final_duration += worker_duration[i];
|
|
|
|
}
|
2023-02-18 00:54:05 +08:00
|
|
|
|
2023-04-03 22:29:21 +08:00
|
|
|
auto total_time_end = std::chrono::steady_clock::now();
|
|
|
|
auto total_time = std::chrono::duration_cast<std::chrono::duration<double>>(total_time_end - total_time_start);
|
|
|
|
|
2020-09-23 00:55:28 +08:00
|
|
|
final_duration /= FLAGS_num_workers;
|
|
|
|
nlohmann::json summary = nlohmann::json::object();
|
2023-04-03 22:29:21 +08:00
|
|
|
summary["total_time"] = total_time.count();
|
2020-09-23 00:55:28 +08:00
|
|
|
summary["count"] = queries.size();
|
|
|
|
summary["duration"] = final_duration;
|
|
|
|
summary["throughput"] = static_cast<double>(queries.size()) / final_duration;
|
|
|
|
summary["retries"] = final_retries;
|
|
|
|
summary["metadata"] = final_metadata.Export();
|
|
|
|
summary["num_workers"] = FLAGS_num_workers;
|
2023-02-18 00:54:05 +08:00
|
|
|
summary["latency_stats"] = LatencyStatistics(worker_query_durations);
|
|
|
|
(*stream) << summary.dump() << std::endl;
|
|
|
|
}
|
|
|
|
|
|
|
|
nlohmann::json BoltRecordsToJSONStrings(std::vector<std::vector<memgraph::communication::bolt::Value>> &results) {
|
|
|
|
nlohmann::json res = nlohmann::json::object();
|
|
|
|
std::ostringstream oss;
|
|
|
|
for (int i = 0; i < results.size(); i++) {
|
|
|
|
oss << results[i];
|
|
|
|
res[std::to_string(i)] = oss.str();
|
|
|
|
}
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Validation mode works on single thread with 1 query.
|
|
|
|
void ExecuteValidation(
|
|
|
|
const std::vector<std::pair<std::string, std::map<std::string, memgraph::communication::bolt::Value>>> &queries,
|
|
|
|
std::ostream *stream) {
|
|
|
|
spdlog::info("Running validation mode, number of workers forced to 1");
|
|
|
|
FLAGS_num_workers = 1;
|
|
|
|
|
|
|
|
Metadata metadata = Metadata();
|
|
|
|
double duration = 0.0;
|
|
|
|
std::vector<std::vector<memgraph::communication::bolt::Value>> results;
|
|
|
|
|
|
|
|
auto size = queries.size();
|
|
|
|
|
|
|
|
memgraph::io::network::Endpoint endpoint(FLAGS_address, FLAGS_port);
|
|
|
|
memgraph::communication::ClientContext context(FLAGS_use_ssl);
|
|
|
|
memgraph::communication::bolt::Client client(context);
|
|
|
|
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
|
|
|
|
|
|
|
memgraph::utils::Timer timer;
|
|
|
|
if (size == 1) {
|
|
|
|
const auto &query = queries[0];
|
|
|
|
auto ret = ExecuteValidationNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries);
|
|
|
|
metadata.Append(ret.first);
|
|
|
|
results = ret.second;
|
|
|
|
duration = timer.Elapsed().count();
|
|
|
|
client.Close();
|
|
|
|
} else {
|
|
|
|
spdlog::info("Validation works with single query, pass just one query!");
|
|
|
|
}
|
|
|
|
|
|
|
|
nlohmann::json summary = nlohmann::json::object();
|
|
|
|
summary["count"] = 1;
|
|
|
|
summary["duration"] = duration;
|
|
|
|
summary["metadata"] = metadata.Export();
|
|
|
|
summary["results"] = BoltRecordsToJSONStrings(results);
|
|
|
|
summary["num_workers"] = FLAGS_num_workers;
|
|
|
|
|
2020-09-23 00:55:28 +08:00
|
|
|
(*stream) << summary.dump() << std::endl;
|
|
|
|
}
|
|
|
|
|
|
|
|
int main(int argc, char **argv) {
|
|
|
|
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
|
|
|
|
2023-02-18 00:54:05 +08:00
|
|
|
spdlog::info("Running a bolt client with following settings:");
|
2023-07-30 20:05:05 +08:00
|
|
|
spdlog::info("Address: {} ", FLAGS_address);
|
2023-02-18 00:54:05 +08:00
|
|
|
spdlog::info("Port: {} ", FLAGS_port);
|
|
|
|
spdlog::info("Username: {} ", FLAGS_username);
|
|
|
|
spdlog::info("Password: {} ", FLAGS_password);
|
|
|
|
spdlog::info("Usessl: {} ", FLAGS_use_ssl);
|
|
|
|
spdlog::info("Num of worker: {}", FLAGS_num_workers);
|
|
|
|
spdlog::info("Max retries: {}", FLAGS_max_retries);
|
|
|
|
spdlog::info("Query JSON: {}", FLAGS_queries_json);
|
|
|
|
spdlog::info("Input: {}", FLAGS_input);
|
|
|
|
spdlog::info("Output: {}", FLAGS_output);
|
|
|
|
spdlog::info("Validation: {}", FLAGS_validation);
|
2023-03-19 03:18:58 +08:00
|
|
|
spdlog::info("Time dependend execution: {}", FLAGS_time_dependent_execution);
|
2023-02-18 00:54:05 +08:00
|
|
|
|
2022-02-22 20:33:45 +08:00
|
|
|
memgraph::communication::SSLInit sslInit;
|
2020-09-23 00:55:28 +08:00
|
|
|
|
|
|
|
std::ifstream ifile;
|
|
|
|
std::istream *istream{&std::cin};
|
|
|
|
if (FLAGS_input != "") {
|
2021-02-18 22:32:43 +08:00
|
|
|
MG_ASSERT(std::filesystem::is_regular_file(FLAGS_input), "Input file isn't a regular file or it doesn't exist!");
|
2020-09-23 00:55:28 +08:00
|
|
|
ifile.open(FLAGS_input);
|
2021-01-21 22:47:56 +08:00
|
|
|
MG_ASSERT(ifile, "Couldn't open input file!");
|
2020-09-23 00:55:28 +08:00
|
|
|
istream = &ifile;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::ofstream ofile;
|
|
|
|
std::ostream *ostream{&std::cout};
|
|
|
|
if (FLAGS_output != "") {
|
|
|
|
ofile.open(FLAGS_output);
|
2021-01-21 22:47:56 +08:00
|
|
|
MG_ASSERT(ifile, "Couldn't open output file!");
|
2020-09-23 00:55:28 +08:00
|
|
|
ostream = &ofile;
|
|
|
|
}
|
|
|
|
|
2022-02-22 20:33:45 +08:00
|
|
|
std::vector<std::pair<std::string, std::map<std::string, memgraph::communication::bolt::Value>>> queries;
|
2020-09-23 00:55:28 +08:00
|
|
|
if (!FLAGS_queries_json) {
|
|
|
|
// Load simple queries.
|
|
|
|
std::string query;
|
|
|
|
while (std::getline(*istream, query)) {
|
2022-02-22 20:33:45 +08:00
|
|
|
auto trimmed = memgraph::utils::Trim(query);
|
2020-09-23 00:55:28 +08:00
|
|
|
if (trimmed == "" || trimmed == ";") {
|
2023-03-19 03:18:58 +08:00
|
|
|
ExecuteWorkload(queries, ostream);
|
2020-09-23 00:55:28 +08:00
|
|
|
queries.clear();
|
|
|
|
continue;
|
|
|
|
}
|
2022-02-22 20:33:45 +08:00
|
|
|
queries.emplace_back(query, std::map<std::string, memgraph::communication::bolt::Value>{});
|
2020-09-23 00:55:28 +08:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Load advanced queries.
|
|
|
|
std::string row;
|
|
|
|
while (std::getline(*istream, row)) {
|
|
|
|
auto data = nlohmann::json::parse(row);
|
2021-01-21 22:47:56 +08:00
|
|
|
MG_ASSERT(data.is_array() && data.size() > 0,
|
|
|
|
"The root item of the loaded JSON queries must be a non-empty "
|
|
|
|
"array!");
|
2021-02-18 22:32:43 +08:00
|
|
|
MG_ASSERT(data.is_array() && data.size() == 2, "Each item of the loaded JSON queries must be an array!");
|
2020-09-23 00:55:28 +08:00
|
|
|
if (data.size() == 0) {
|
2023-03-19 03:18:58 +08:00
|
|
|
ExecuteWorkload(queries, ostream);
|
2020-09-23 00:55:28 +08:00
|
|
|
queries.clear();
|
|
|
|
continue;
|
|
|
|
}
|
2021-01-21 22:47:56 +08:00
|
|
|
MG_ASSERT(data.size() == 2,
|
|
|
|
"Each item of the loaded JSON queries that has "
|
|
|
|
"data must be an array of length 2!");
|
2020-09-23 00:55:28 +08:00
|
|
|
const auto &query = data[0];
|
|
|
|
const auto ¶m = data[1];
|
2021-01-21 22:47:56 +08:00
|
|
|
MG_ASSERT(query.is_string() && param.is_object(),
|
|
|
|
"The query must be a string and the parameters must be a "
|
|
|
|
"dictionary!");
|
2020-09-23 00:55:28 +08:00
|
|
|
auto bolt_param = JsonToBoltValue(param);
|
2021-01-21 22:47:56 +08:00
|
|
|
MG_ASSERT(bolt_param.IsMap(), "The Bolt parameters must be a map!");
|
2020-09-23 00:55:28 +08:00
|
|
|
queries.emplace_back(query, std::move(bolt_param.ValueMap()));
|
|
|
|
}
|
|
|
|
}
|
2023-02-18 00:54:05 +08:00
|
|
|
|
2023-03-19 03:18:58 +08:00
|
|
|
if (FLAGS_validation) {
|
2023-02-18 00:54:05 +08:00
|
|
|
ExecuteValidation(queries, ostream);
|
2023-03-19 03:18:58 +08:00
|
|
|
} else if (FLAGS_time_dependent_execution > 0) {
|
|
|
|
ExecuteTimeDependentWorkload(queries, ostream);
|
|
|
|
} else {
|
|
|
|
ExecuteWorkload(queries, ostream);
|
2023-02-18 00:54:05 +08:00
|
|
|
}
|
2020-09-23 00:55:28 +08:00
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|