Do preprocess in client
Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D793
This commit is contained in:
parent
59b9b7af21
commit
d640ca3f1a
@ -67,8 +67,10 @@ class QueryClient:
|
|||||||
str(queries), return_code, stderr)
|
str(queries), return_code, stderr)
|
||||||
raise Exception("BoltClient execution failed")
|
raise Exception("BoltClient execution failed")
|
||||||
|
|
||||||
|
data = {"groups" : []}
|
||||||
with open(output) as f:
|
with open(output) as f:
|
||||||
data = json.loads(f.read())
|
for line in f:
|
||||||
|
data["groups"].append(json.loads(line))
|
||||||
data[CPU_TIME] = cpu_time_end - cpu_time_start
|
data[CPU_TIME] = cpu_time_end - cpu_time_start
|
||||||
data[MAX_MEMORY] = usage["max_memory"]
|
data[MAX_MEMORY] = usage["max_memory"]
|
||||||
|
|
||||||
|
@ -16,8 +16,9 @@ using communication::bolt::DecodedValue;
|
|||||||
|
|
||||||
class BoltClient {
|
class BoltClient {
|
||||||
public:
|
public:
|
||||||
BoltClient(std::string &address, std::string &port, std::string &username,
|
BoltClient(const std::string &address, const std::string &port,
|
||||||
std::string &password, std::string database = "") {
|
const std::string &username, const std::string &password,
|
||||||
|
const std::string & = "") {
|
||||||
SocketT socket;
|
SocketT socket;
|
||||||
EndpointT endpoint;
|
EndpointT endpoint;
|
||||||
|
|
||||||
|
@ -70,6 +70,7 @@ communication::bolt::QueryData ExecuteNTimesTillSuccess(
|
|||||||
std::chrono::milliseconds(rand_dist_(pseudo_rand_gen_)));
|
std::chrono::milliseconds(rand_dist_(pseudo_rand_gen_)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG(WARNING) << query << " failed " << times << "times";
|
||||||
throw last_exception;
|
throw last_exception;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
#include <glog/logging.h>
|
#include <glog/logging.h>
|
||||||
#include <json/json.hpp>
|
#include <json/json.hpp>
|
||||||
|
|
||||||
|
#include "bolt_client.hpp"
|
||||||
#include "common.hpp"
|
#include "common.hpp"
|
||||||
#include "communication/bolt/client.hpp"
|
#include "communication/bolt/client.hpp"
|
||||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||||
@ -23,9 +24,6 @@
|
|||||||
#include "utils/assert.hpp"
|
#include "utils/assert.hpp"
|
||||||
#include "utils/timer.hpp"
|
#include "utils/timer.hpp"
|
||||||
|
|
||||||
using SocketT = io::network::Socket;
|
|
||||||
using EndpointT = io::network::NetworkEndpoint;
|
|
||||||
using Client = communication::bolt::Client<SocketT>;
|
|
||||||
using communication::bolt::DecodedValue;
|
using communication::bolt::DecodedValue;
|
||||||
using communication::bolt::DecodedVertex;
|
using communication::bolt::DecodedVertex;
|
||||||
using communication::bolt::DecodedEdge;
|
using communication::bolt::DecodedEdge;
|
||||||
@ -39,7 +37,6 @@ DEFINE_string(password, "", "Password for the database");
|
|||||||
DEFINE_int32(duration, 30, "Number of seconds to execute benchmark");
|
DEFINE_int32(duration, 30, "Number of seconds to execute benchmark");
|
||||||
|
|
||||||
const int MAX_RETRIES = 30;
|
const int MAX_RETRIES = 30;
|
||||||
const int NUM_BUCKETS = 100;
|
|
||||||
|
|
||||||
struct VertexAndEdges {
|
struct VertexAndEdges {
|
||||||
DecodedVertex vertex;
|
DecodedVertex vertex;
|
||||||
@ -47,17 +44,23 @@ struct VertexAndEdges {
|
|||||||
std::vector<DecodedVertex> vertices;
|
std::vector<DecodedVertex> vertices;
|
||||||
};
|
};
|
||||||
|
|
||||||
std::pair<VertexAndEdges, int> DetachDeleteVertex(Client &client,
|
std::pair<VertexAndEdges, int> DetachDeleteVertex(BoltClient &client,
|
||||||
const std::string &label,
|
const std::string &label,
|
||||||
int64_t id) {
|
int64_t id) {
|
||||||
|
auto vertex_record =
|
||||||
|
ExecuteNTimesTillSuccess(
|
||||||
|
client, "MATCH (n :" + label + " {id : $id}) RETURN n",
|
||||||
|
std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES)
|
||||||
|
.records;
|
||||||
|
CHECK(vertex_record.size() == 1U) << "id : " << id << " "
|
||||||
|
<< vertex_record.size();
|
||||||
|
|
||||||
auto records =
|
auto records =
|
||||||
ExecuteNTimesTillSuccess(
|
ExecuteNTimesTillSuccess(
|
||||||
client, "MATCH (n :" + label + " {id : $id})-[e]-(m) RETURN n, e, m",
|
client, "MATCH (n :" + label + " {id : $id})-[e]-(m) RETURN n, e, m",
|
||||||
std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES)
|
std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES)
|
||||||
.records;
|
.records;
|
||||||
|
|
||||||
if (records.size() == 0U) return {{}, 1};
|
|
||||||
|
|
||||||
ExecuteNTimesTillSuccess(
|
ExecuteNTimesTillSuccess(
|
||||||
client, "MATCH (n :" + label + " {id : $id})-[]-(m) DETACH DELETE n",
|
client, "MATCH (n :" + label + " {id : $id})-[]-(m) DETACH DELETE n",
|
||||||
std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES);
|
std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES);
|
||||||
@ -74,10 +77,11 @@ std::pair<VertexAndEdges, int> DetachDeleteVertex(Client &client,
|
|||||||
vertices.push_back(record[2].ValueVertex());
|
vertices.push_back(record[2].ValueVertex());
|
||||||
}
|
}
|
||||||
|
|
||||||
return {{records[0][0].ValueVertex(), edges, vertices}, 2};
|
return {{vertex_record[0][0].ValueVertex(), edges, vertices}, 3};
|
||||||
}
|
}
|
||||||
|
|
||||||
int ReturnVertexAndEdges(Client &client, const VertexAndEdges &vertex_and_edges,
|
int ReturnVertexAndEdges(BoltClient &client,
|
||||||
|
const VertexAndEdges &vertex_and_edges,
|
||||||
const std::string &independent_label) {
|
const std::string &independent_label) {
|
||||||
int num_queries = 0;
|
int num_queries = 0;
|
||||||
{
|
{
|
||||||
@ -134,13 +138,35 @@ int ReturnVertexAndEdges(Client &client, const VertexAndEdges &vertex_and_edges,
|
|||||||
if (x.type() == DecodedValue::Type::Double) {
|
if (x.type() == DecodedValue::Type::Double) {
|
||||||
LOG_EVERY_N(INFO, 5000) << "exec " << x.ValueDouble() << " planning "
|
LOG_EVERY_N(INFO, 5000) << "exec " << x.ValueDouble() << " planning "
|
||||||
<< y.ValueDouble();
|
<< y.ValueDouble();
|
||||||
CHECK(ret.records.size() == 1U) << "Graph in invalid state";
|
|
||||||
}
|
}
|
||||||
|
CHECK(ret.records.size() == 1U)
|
||||||
|
<< "Graph in invalid state "
|
||||||
|
<< vertex_and_edges.vertex.properties.at("id");
|
||||||
++num_queries;
|
++num_queries;
|
||||||
}
|
}
|
||||||
return num_queries;
|
return num_queries;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t NumNodes(BoltClient &client, const std::string &label) {
|
||||||
|
auto result = ExecuteNTimesTillSuccess(
|
||||||
|
client, "MATCH (n :" + label + ") RETURN COUNT(n) as cnt", {},
|
||||||
|
MAX_RETRIES);
|
||||||
|
return result.records[0][0].ValueInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<int64_t> Neighbours(BoltClient &client, const std::string &label,
|
||||||
|
int64_t id) {
|
||||||
|
auto result = ExecuteNTimesTillSuccess(
|
||||||
|
client, "MATCH (n :" + label + " {id: " + std::to_string(id) +
|
||||||
|
"})-[e]-(m) RETURN m.id",
|
||||||
|
{}, MAX_RETRIES);
|
||||||
|
std::vector<int64_t> ret;
|
||||||
|
for (const auto &record : result.records) {
|
||||||
|
ret.push_back(record[0].ValueInt());
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char **argv) {
|
int main(int argc, char **argv) {
|
||||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||||
google::InitGoogleLogging(argv[0]);
|
google::InitGoogleLogging(argv[0]);
|
||||||
@ -149,40 +175,60 @@ int main(int argc, char **argv) {
|
|||||||
std::cin >> config;
|
std::cin >> config;
|
||||||
const auto &queries = config["queries"];
|
const auto &queries = config["queries"];
|
||||||
const double read_probability = config["read_probability"];
|
const double read_probability = config["read_probability"];
|
||||||
const int64_t num_independent_nodes = config["num_independent_nodes"];
|
|
||||||
const std::string independent_label = config["independent_label"];
|
const std::string independent_label = config["independent_label"];
|
||||||
const int64_t num_nodes = config["num_nodes"];
|
std::vector<int64_t> independent_nodes_ids;
|
||||||
|
|
||||||
|
BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password);
|
||||||
|
const int64_t num_nodes = NumNodes(client, independent_label);
|
||||||
|
{
|
||||||
|
std::vector<int64_t> ids;
|
||||||
|
std::unordered_set<int64_t> independent;
|
||||||
|
for (int64_t i = 1; i <= num_nodes; ++i) {
|
||||||
|
ids.push_back(i);
|
||||||
|
independent.insert(i);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
std::mt19937 mt;
|
||||||
|
std::shuffle(ids.begin(), ids.end(), mt);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto i : ids) {
|
||||||
|
if (independent.find(i) == independent.end()) continue;
|
||||||
|
independent.erase(i);
|
||||||
|
std::vector<int64_t> neighbour_ids =
|
||||||
|
Neighbours(client, independent_label, i);
|
||||||
|
independent_nodes_ids.push_back(i);
|
||||||
|
for (auto j : neighbour_ids) {
|
||||||
|
independent.erase(j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
utils::Timer timer;
|
utils::Timer timer;
|
||||||
std::vector<std::thread> threads;
|
std::vector<std::thread> threads;
|
||||||
std::atomic<int64_t> executed_queries{0};
|
std::atomic<int64_t> executed_queries{0};
|
||||||
std::atomic<bool> keep_running{true};
|
std::atomic<bool> keep_running{true};
|
||||||
|
|
||||||
|
LOG(INFO) << "nodes " << num_nodes << " independent "
|
||||||
|
<< independent_nodes_ids.size();
|
||||||
|
|
||||||
|
int64_t next_to_assign = 0;
|
||||||
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
||||||
|
int64_t size = independent_nodes_ids.size();
|
||||||
|
int64_t next_next_to_assign = next_to_assign + size / FLAGS_num_workers +
|
||||||
|
(i < size % FLAGS_num_workers);
|
||||||
|
std::vector<int64_t> to_remove(
|
||||||
|
independent_nodes_ids.begin() + next_to_assign,
|
||||||
|
independent_nodes_ids.begin() + next_next_to_assign);
|
||||||
|
LOG(INFO) << next_to_assign << " " << next_next_to_assign;
|
||||||
|
next_to_assign = next_next_to_assign;
|
||||||
|
|
||||||
threads.emplace_back(
|
threads.emplace_back(
|
||||||
[&](int thread_id) {
|
[&](int thread_id, std::vector<int64_t> to_remove) {
|
||||||
// Initialise client.
|
BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username,
|
||||||
SocketT socket;
|
FLAGS_password);
|
||||||
EndpointT endpoint;
|
|
||||||
try {
|
|
||||||
endpoint = EndpointT(FLAGS_address, FLAGS_port);
|
|
||||||
} catch (const io::network::NetworkEndpointException &e) {
|
|
||||||
LOG(FATAL) << "Invalid address or port: " << FLAGS_address << ":"
|
|
||||||
<< FLAGS_port;
|
|
||||||
}
|
|
||||||
if (!socket.Connect(endpoint)) {
|
|
||||||
LOG(FATAL) << "Could not connect to: " << FLAGS_address << ":"
|
|
||||||
<< FLAGS_port;
|
|
||||||
}
|
|
||||||
Client client(std::move(socket), FLAGS_username, FLAGS_password);
|
|
||||||
|
|
||||||
std::mt19937 random_gen(thread_id);
|
std::mt19937 random_gen(thread_id);
|
||||||
int64_t to_remove =
|
|
||||||
num_independent_nodes / FLAGS_num_workers * thread_id + 1;
|
|
||||||
int64_t last_to_remove =
|
|
||||||
to_remove + num_independent_nodes / FLAGS_num_workers;
|
|
||||||
bool remove = true;
|
|
||||||
int64_t num_shifts = 0;
|
|
||||||
std::vector<VertexAndEdges> removed;
|
std::vector<VertexAndEdges> removed;
|
||||||
|
|
||||||
while (keep_running) {
|
while (keep_running) {
|
||||||
@ -203,40 +249,34 @@ int main(int argc, char **argv) {
|
|||||||
MAX_RETRIES);
|
MAX_RETRIES);
|
||||||
++executed_queries;
|
++executed_queries;
|
||||||
} else {
|
} else {
|
||||||
if (!remove) {
|
if (real_dist(random_gen) <
|
||||||
|
static_cast<double>(removed.size()) /
|
||||||
|
(removed.size() + to_remove.size())) {
|
||||||
|
CHECK(removed.size());
|
||||||
|
std::uniform_int_distribution<> int_dist(0, removed.size() - 1);
|
||||||
|
std::swap(removed.back(), removed[int_dist(random_gen)]);
|
||||||
executed_queries += ReturnVertexAndEdges(client, removed.back(),
|
executed_queries += ReturnVertexAndEdges(client, removed.back(),
|
||||||
independent_label);
|
independent_label);
|
||||||
|
to_remove.push_back(
|
||||||
|
removed.back().vertex.properties["id"].ValueInt());
|
||||||
removed.pop_back();
|
removed.pop_back();
|
||||||
if (removed.empty()) {
|
|
||||||
remove = true;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
auto ret =
|
CHECK(to_remove.size());
|
||||||
DetachDeleteVertex(client, independent_label, to_remove);
|
std::uniform_int_distribution<> int_dist(0,
|
||||||
++to_remove;
|
to_remove.size() - 1);
|
||||||
|
std::swap(to_remove.back(), to_remove[int_dist(random_gen)]);
|
||||||
|
auto ret = DetachDeleteVertex(client, independent_label,
|
||||||
|
to_remove.back());
|
||||||
|
removed.push_back(ret.first);
|
||||||
|
to_remove.pop_back();
|
||||||
executed_queries += ret.second;
|
executed_queries += ret.second;
|
||||||
if (ret.second > 1) {
|
|
||||||
removed.push_back(std::move(ret.first));
|
|
||||||
}
|
|
||||||
if (to_remove == last_to_remove) {
|
|
||||||
for (auto &x : removed) {
|
|
||||||
x.vertex.properties["id"].ValueInt() += num_nodes;
|
|
||||||
}
|
|
||||||
remove = false;
|
|
||||||
++num_shifts;
|
|
||||||
to_remove =
|
|
||||||
num_independent_nodes / FLAGS_num_workers * thread_id +
|
|
||||||
1 + num_shifts * num_nodes;
|
|
||||||
last_to_remove =
|
|
||||||
to_remove + num_independent_nodes / FLAGS_num_workers;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
client.Close();
|
client.Close();
|
||||||
},
|
},
|
||||||
i);
|
i, std::move(to_remove));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open stream for writing stats.
|
// Open stream for writing stats.
|
||||||
|
@ -28,8 +28,9 @@ class ClientQueryException : public ClientException {
|
|||||||
|
|
||||||
class Client {
|
class Client {
|
||||||
public:
|
public:
|
||||||
Client(std::string &host, std::string &port, std::string &username,
|
Client(const std::string &host, const std::string &port,
|
||||||
std::string &password, std::string database = "") {
|
const std::string &username, const std::string &password,
|
||||||
|
const std::string &database = "") {
|
||||||
// https://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS
|
// https://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS
|
||||||
std::string pass = "";
|
std::string pass = "";
|
||||||
if (password != "") {
|
if (password != "") {
|
||||||
|
@ -8,6 +8,7 @@
|
|||||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||||
#include "threading/sync/spinlock.hpp"
|
#include "threading/sync/spinlock.hpp"
|
||||||
#include "utils/algorithm.hpp"
|
#include "utils/algorithm.hpp"
|
||||||
|
#include "utils/string.hpp"
|
||||||
#include "utils/timer.hpp"
|
#include "utils/timer.hpp"
|
||||||
|
|
||||||
#include "bolt_client.hpp"
|
#include "bolt_client.hpp"
|
||||||
@ -45,25 +46,20 @@ void PrintSummary(
|
|||||||
os << "{\"wall_time\": " << duration << ", "
|
os << "{\"wall_time\": " << duration << ", "
|
||||||
<< "\"metadatas\": ";
|
<< "\"metadatas\": ";
|
||||||
PrintJsonMetadata(os, metadata);
|
PrintJsonMetadata(os, metadata);
|
||||||
os << "}";
|
os << "}\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename ClientT, typename ExceptionT>
|
template <typename ClientT>
|
||||||
void ExecuteQueries(std::istream &istream, int num_workers,
|
void ExecuteQueries(const std::vector<std::string> &queries, int num_workers,
|
||||||
std::ostream &ostream, std::string &address,
|
std::ostream &ostream, std::string &address,
|
||||||
std::string &port, std::string &username,
|
std::string &port, std::string &username,
|
||||||
std::string &password, std::string &database) {
|
std::string &password, std::string &database) {
|
||||||
std::string query;
|
|
||||||
std::vector<std::thread> threads;
|
std::vector<std::thread> threads;
|
||||||
|
|
||||||
SpinLock spinlock;
|
SpinLock spinlock;
|
||||||
uint64_t last = 0;
|
uint64_t last = 0;
|
||||||
std::vector<std::string> queries;
|
|
||||||
std::vector<std::map<std::string, DecodedValue>> metadata;
|
std::vector<std::map<std::string, DecodedValue>> metadata;
|
||||||
|
|
||||||
while (std::getline(istream, query)) {
|
|
||||||
queries.push_back(query);
|
|
||||||
}
|
|
||||||
metadata.resize(queries.size());
|
metadata.resize(queries.size());
|
||||||
|
|
||||||
utils::Timer timer;
|
utils::Timer timer;
|
||||||
@ -86,7 +82,7 @@ void ExecuteQueries(std::istream &istream, int num_workers,
|
|||||||
try {
|
try {
|
||||||
metadata[pos] =
|
metadata[pos] =
|
||||||
ExecuteNTimesTillSuccess(client, str, {}, MAX_RETRIES).metadata;
|
ExecuteNTimesTillSuccess(client, str, {}, MAX_RETRIES).metadata;
|
||||||
} catch (const ExceptionT &e) {
|
} catch (const utils::BasicException &e) {
|
||||||
LOG(FATAL) << "Could not execute query '" << str << "' "
|
LOG(FATAL) << "Could not execute query '" << str << "' "
|
||||||
<< MAX_RETRIES << " times! Error message: " << e.what();
|
<< MAX_RETRIES << " times! Error message: " << e.what();
|
||||||
}
|
}
|
||||||
@ -128,12 +124,22 @@ int main(int argc, char **argv) {
|
|||||||
std::string port = FLAGS_port;
|
std::string port = FLAGS_port;
|
||||||
if (FLAGS_protocol == "bolt") {
|
if (FLAGS_protocol == "bolt") {
|
||||||
if (port == "") port = "7687";
|
if (port == "") port = "7687";
|
||||||
|
} else if (FLAGS_protocol == "postgres") {
|
||||||
|
if (port == "") port = "5432";
|
||||||
|
}
|
||||||
|
|
||||||
using BoltClientT = BoltClient;
|
while (!istream->eof()) {
|
||||||
using BoltExceptionT = communication::bolt::ClientQueryException;
|
std::vector<std::string> queries;
|
||||||
ExecuteQueries<BoltClientT, BoltExceptionT>(
|
std::string query;
|
||||||
*istream, FLAGS_num_workers, *ostream, FLAGS_address, port,
|
while (std::getline(*istream, query) && utils::Trim(query) != "" &&
|
||||||
FLAGS_username, FLAGS_password, FLAGS_database);
|
utils::Trim(query) != ";") {
|
||||||
|
queries.push_back(query);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (FLAGS_protocol == "bolt") {
|
||||||
|
ExecuteQueries<BoltClient>(queries, FLAGS_num_workers, *ostream,
|
||||||
|
FLAGS_address, port, FLAGS_username,
|
||||||
|
FLAGS_password, FLAGS_database);
|
||||||
} else if (FLAGS_protocol == "postgres") {
|
} else if (FLAGS_protocol == "postgres") {
|
||||||
LOG(FATAL) << "Postgres not yet supported";
|
LOG(FATAL) << "Postgres not yet supported";
|
||||||
// TODO: Currently libpq is linked dynamically so it is a pain to move
|
// TODO: Currently libpq is linked dynamically so it is a pain to move
|
||||||
@ -150,6 +156,7 @@ int main(int argc, char **argv) {
|
|||||||
// *istream, FLAGS_num_workers, *ostream, FLAGS_address, port,
|
// *istream, FLAGS_num_workers, *ostream, FLAGS_address, port,
|
||||||
// FLAGS_username, FLAGS_password, FLAGS_database);
|
// FLAGS_username, FLAGS_password, FLAGS_database);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,4 @@
|
|||||||
{
|
{
|
||||||
"num_independent_nodes" : 4111,
|
|
||||||
"num_nodes" : 10000,
|
|
||||||
"independent_label": "User",
|
"independent_label": "User",
|
||||||
"read_probability": 0.5,
|
"read_probability": 0.5,
|
||||||
"queries" : [
|
"queries" : [
|
||||||
|
@ -24,33 +24,14 @@ class LongRunningSuite:
|
|||||||
|
|
||||||
def run(self, scenario, group_name, scenario_name, runner):
|
def run(self, scenario, group_name, scenario_name, runner):
|
||||||
runner.start()
|
runner.start()
|
||||||
# This suite allows empty lines in setup. Those lines separate query
|
|
||||||
# groups. It is guaranteed that groups will be executed sequentially,
|
|
||||||
# but queries in each group are possibly executed concurrently.
|
|
||||||
query_groups = [[]]
|
|
||||||
for query in scenario.get("setup")():
|
|
||||||
if query == "":
|
|
||||||
query_groups.append([])
|
|
||||||
else:
|
|
||||||
query_groups[-1].append(query)
|
|
||||||
if query_groups[-1] == []:
|
|
||||||
query_groups.pop()
|
|
||||||
|
|
||||||
log.info("Executing {} query groups in setup"
|
log.info("Executing setup")
|
||||||
.format(len(query_groups)))
|
runner.setup(scenario.get("setup")(), self.args.num_client_workers)
|
||||||
|
|
||||||
for i, queries in enumerate(query_groups):
|
|
||||||
start_time = time.time()
|
|
||||||
# TODO: number of threads configurable
|
|
||||||
runner.setup(queries, self.args.num_client_workers)
|
|
||||||
log.info("\t{}. group imported in done in {:.2f} seconds".format(
|
|
||||||
i + 1, time.time() - start_time))
|
|
||||||
|
|
||||||
config = next(scenario.get("config")())
|
config = next(scenario.get("config")())
|
||||||
duration = config["duration"]
|
duration = config["duration"]
|
||||||
log.info("Executing run for {} seconds with {} client workers".format(
|
log.info("Executing run for {} seconds with {} client workers".format(
|
||||||
duration, self.args.num_client_workers))
|
duration, self.args.num_client_workers))
|
||||||
# TODO: number of threads configurable
|
|
||||||
results = runner.run(next(scenario.get("run")()), duration,
|
results = runner.run(next(scenario.get("run")()), duration,
|
||||||
self.args.num_client_workers)
|
self.args.num_client_workers)
|
||||||
|
|
||||||
|
@ -91,14 +91,17 @@ class _QuerySuite:
|
|||||||
execute("itersetup")
|
execute("itersetup")
|
||||||
run_result = execute("run",
|
run_result = execute("run",
|
||||||
scenario_config.get("num_client_workers", 1))
|
scenario_config.get("num_client_workers", 1))
|
||||||
add_measurement(run_result, iteration, WALL_TIME)
|
|
||||||
add_measurement(run_result, iteration, CPU_TIME)
|
add_measurement(run_result, iteration, CPU_TIME)
|
||||||
add_measurement(run_result, iteration, MAX_MEMORY)
|
add_measurement(run_result, iteration, MAX_MEMORY)
|
||||||
|
assert len(run_result["groups"]) == 1, \
|
||||||
|
"Multiple groups in run step not yet supported"
|
||||||
|
group = run_result["groups"][0]
|
||||||
|
add_measurement(group, iteration, WALL_TIME)
|
||||||
for measurement in ["parsing_time",
|
for measurement in ["parsing_time",
|
||||||
"plan_execution_time",
|
"plan_execution_time",
|
||||||
"planning_time"] :
|
"planning_time"] :
|
||||||
for i in range(len(run_result.get("metadatas", []))):
|
for i in range(len(group.get("metadatas", []))):
|
||||||
add_measurement(run_result["metadatas"][i], iteration,
|
add_measurement(group["metadatas"][i], iteration,
|
||||||
measurement)
|
measurement)
|
||||||
execute("iterteardown")
|
execute("iterteardown")
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user