From cd1892acc45ab51e85e99f759895545c2d9478c6 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Sat, 16 Sep 2017 21:49:26 +0200 Subject: [PATCH] Rewritten long running test in C++. Reviewers: florijan, mislav.bradac Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D784 --- tests/manual/long_running.cpp | 394 ++++++++++++++++++++++++++++++++++ 1 file changed, 394 insertions(+) create mode 100644 tests/manual/long_running.cpp diff --git a/tests/manual/long_running.cpp b/tests/manual/long_running.cpp new file mode 100644 index 000000000..6d86455c8 --- /dev/null +++ b/tests/manual/long_running.cpp @@ -0,0 +1,394 @@ +#include +#include +#include + +#include "communication/bolt/client.hpp" +#include "io/network/network_endpoint.hpp" +#include "io/network/socket.hpp" +#include "utils/exceptions.hpp" +#include "utils/timer.hpp" + +using SocketT = io::network::Socket; +using EndpointT = io::network::NetworkEndpoint; +using ClientT = communication::bolt::Client; +using DecodedValueT = communication::bolt::DecodedValue; +using QueryDataT = communication::bolt::QueryData; +using ExceptionT = communication::bolt::ClientQueryException; + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_string(port, "7687", "Server port"); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); + +DEFINE_int32(vertex_count, 0, "The average number of vertices in the graph"); +DEFINE_int32(edge_count, 0, "The average number of edges in the graph"); +DEFINE_int32(vertex_batch, 200, + "The number of vertices to be created simultaneously"); +DEFINE_int32(prop_count, 5, "The max number of properties on a node"); +DEFINE_uint64(max_queries, 1 << 30, "Maximum number of queries to execute"); +DEFINE_int32(max_time, 1, "Maximum execution time in minutes"); +DEFINE_int32(verify, 0, "Interval (seconds) between checking local info"); +DEFINE_int32(worker_count, 1, + "The number of workers that operate on the graph independently"); + +/** + * Encapsulates a Graph and a Bolt session and provides CRUD op functions. + * Also defines a run-loop for a generic exectutor, and a graph state + * verification function. + */ +class GraphSession { + public: + GraphSession(int id) + : id_(id), + indexed_label_(fmt::format("indexed_label{}", id)), + generator_{std::random_device{}()} { + for (int i = 0; i < FLAGS_prop_count; ++i) { + auto label = fmt::format("label{}", i); + labels_.insert(label); + labels_vertices_.insert({label, {}}); + } + + EndpointT endpoint(FLAGS_address, FLAGS_port); + SocketT socket; + + if (!socket.Connect(endpoint)) { + throw utils::BasicException("Couldn't connect to server!"); + } + + client_ = std::make_unique(std::move(socket), FLAGS_username, + FLAGS_password); + } + + private: + uint64_t id_; + std::unique_ptr client_; + + std::set vertices_; + std::set edges_; + + std::string indexed_label_; + std::set labels_; + + std::map> labels_vertices_; + + uint64_t executed_queries_{0}; + std::map query_failures_; + + std::mt19937 generator_; + + utils::Timer timer_; + + private: + double GetRandom() { return std::generate_canonical(generator_); } + + bool Bernoulli(double p) { return GetRandom() < p; } + + template + T RandomElement(std::set &data) { + uint64_t pos = std::floor(GetRandom() * data.size()); + auto it = data.begin(); + std::advance(it, pos); + return *it; + } + + void AddQueryFailure(std::string what) { + auto it = query_failures_.find(what); + if (it != query_failures_.end()) { + ++it->second; + } else { + query_failures_.insert(std::make_pair(what, 1)); + } + } + + QueryDataT Execute(std::string query) { + try { + DLOG(INFO) << "Runner " << id_ << " executing query: " << query; + executed_queries_ += 1; + return client_->Execute(query, {}); + } catch (const ExceptionT &e) { + AddQueryFailure(std::string{e.what()}); + return QueryDataT(); + } + } + + void CreateVertices(uint64_t vertices_count) { + if (vertices_count == 0) return; + auto ret = + Execute(fmt::format("UNWIND RANGE(1, {}) AS r CREATE (n:{} {{id: " + "counter(\"vertex{}\")}}) RETURN min(n.id)", + vertices_count, indexed_label_, id_)); + permanent_assert(ret.records.size() == 1, "Vertices creation failed!"); + uint64_t min_id = ret.records[0][0].ValueInt(); + for (uint64_t i = 0; i < vertices_count; ++i) { + vertices_.insert(min_id + i); + } + } + + void RemoveVertex() { + auto vertex_id = RandomElement(vertices_); + auto ret = + Execute(fmt::format("MATCH (n:{} {{id: {}}}) OPTIONAL MATCH (n)-[r]-() " + "DETACH DELETE n RETURN n.id, labels(n), r.id", + indexed_label_, vertex_id)); + if (ret.records.size() > 0) { + std::set processed_vertices; + for (auto &record : ret.records) { + // remove vertex but note there could be duplicates + auto n_id = record[0].ValueInt(); + if (processed_vertices.insert(n_id).second) { + vertices_.erase(n_id); + for (auto &label : record[1].ValueList()) { + if (label.ValueString() == indexed_label_) { + continue; + } + labels_vertices_[label.ValueString()].erase(n_id); + } + } + // remove edge + auto &edge = record[2]; + if (edge.type() == DecodedValueT::Type::Int) { + edges_.erase(edge.ValueInt()); + } + } + } + } + + void CreateEdges(uint64_t edges_count) { + if (edges_count == 0) return; + double probability = + (double)edges_count / (double)(vertices_.size() * vertices_.size()); + auto ret = Execute(fmt::format( + "MATCH (a:{0}) WITH a MATCH (b:{0}) WITH a, b WHERE rand() < {1} " + "CREATE (a)-[e:EdgeType {{id: counter(\"edge{2}\")}}]->(b) RETURN " + "min(e.id), count(e)", + indexed_label_, probability, id_)); + if (ret.records.size() > 0) { + uint64_t min_id = ret.records[0][0].ValueInt(); + uint64_t count = ret.records[0][1].ValueInt(); + for (uint64_t i = 0; i < count; ++i) { + edges_.insert(min_id + i); + } + } + } + + void CreateEdge() { + auto ret = + Execute(fmt::format("MATCH (from:{} {{id: {}}}), (to:{} {{id: {}}}) " + "CREATE (from)-[e:EdgeType {{id: " + "counter(\"edge{}\")}}]->(to) RETURN e.id", + indexed_label_, RandomElement(vertices_), + indexed_label_, RandomElement(vertices_), id_)); + if (ret.records.size() > 0) { + edges_.insert(ret.records[0][0].ValueInt()); + } + } + + void RemoveEdge() { + auto edge_id = RandomElement(edges_); + auto ret = Execute( + fmt::format("MATCH (:{})-[e {{id: {}}}]->(:{}) DELETE e RETURN e.id", + indexed_label_, edge_id, indexed_label_)); + if (ret.records.size() > 0) { + edges_.erase(edge_id); + } + } + + void AddLabel() { + auto vertex_id = RandomElement(vertices_); + auto label = RandomElement(labels_); + // add a label on a vertex that didn't have that label + // yet (we need that for book-keeping) + auto ret = Execute(fmt::format( + "MATCH (v:{} {{id: {}}}) WHERE not v:{} SET v:{} RETURN v.id", + indexed_label_, vertex_id, label, label)); + if (ret.records.size() > 0) { + labels_vertices_[label].insert(vertex_id); + } + } + + void UpdateGlobalVertices() { + uint64_t vertex_id = *vertices_.rbegin(); + uint64_t lo = std::floor(GetRandom() * vertex_id); + uint64_t hi = std::floor(lo + vertex_id * 0.01); + uint64_t num = std::floor(GetRandom() * (1 << 30)); + Execute( + fmt::format("MATCH (n) WHERE n.id > {} AND n.id < {} SET n.value = {}", + lo, hi, num)); + } + + void UpdateGlobalEdges() { + uint64_t vertex_id = *vertices_.rbegin(); + uint64_t lo = std::floor(GetRandom() * vertex_id); + uint64_t hi = std::floor(lo + vertex_id * 0.01); + uint64_t num = std::floor(GetRandom() * (1 << 30)); + Execute(fmt::format( + "MATCH ()-[e]->() WHERE e.id > {} AND e.id < {} SET e.value = {}", lo, + hi, num)); + } + + /** Checks if the local info corresponds to DB state */ + void VerifyGraph() { + // helper lambda for count verification + auto test_count = [this](std::string query, int64_t count, + std::string message) { + auto ret = Execute(query); + if (ret.records.size() == 0) { + throw utils::BasicException("Couldn't execute count!"); + } + if (ret.records[0][0].ValueInt() != count) { + throw utils::BasicException( + fmt::format(message, id_, count, ret.records[0][0].ValueInt())); + } + }; + + test_count(fmt::format("MATCH (n:{}) RETURN count(n)", indexed_label_), + vertices_.size(), "Runner {} expected {} vertices, found {}!"); + test_count( + fmt::format("MATCH (:{0})-[r]->(:{0}) RETURN count(r)", indexed_label_), + edges_.size(), "Runner {} expected {} edges, found {}!"); + + for (auto &item : labels_vertices_) { + test_count( + fmt::format("MATCH (n:{}:{}) RETURN count(n)", indexed_label_, + item.first), + item.second.size(), + fmt::format( + "Runner {{}} expected {{}} vertices with label '{}', found {{}}!", + item.first)); + } + + // generate report + std::ostringstream report; + report << std::endl + << fmt::format("Runner {} graph verification success:", id_) + << std::endl + << fmt::format("\tExecuted {} queries in {:.2f} seconds", + executed_queries_, timer_.Elapsed().count()) + << std::endl + << fmt::format("\tGraph has {} vertices and {} edges", + vertices_.size(), edges_.size()) + << std::endl; + for (auto &label : labels_) { + report << fmt::format("\tVertices with label '{}': {}", label, + labels_vertices_[label].size()) + << std::endl; + } + if (query_failures_.size() > 0) { + report << "\tQuery failed (reason: count)" << std::endl; + for (auto &item : query_failures_) { + report << fmt::format("\t\t'{}': {}", item.first, item.second) + << std::endl; + } + } + LOG(INFO) << report.str(); + } + + public: + void Run() { + uint64_t vertex_count = FLAGS_vertex_count / FLAGS_worker_count; + uint64_t edge_count = FLAGS_edge_count / FLAGS_worker_count; + + // initial vertex creation + CreateVertices(vertex_count); + + // initial edge creation + CreateEdges(edge_count); + + double last_verify = timer_.Elapsed().count(); + + // run rest + while (executed_queries_ < FLAGS_max_queries && + timer_.Elapsed().count() / 60.0 < FLAGS_max_time) { + if (FLAGS_verify > 0 && + timer_.Elapsed().count() - last_verify > FLAGS_verify) { + VerifyGraph(); + last_verify = timer_.Elapsed().count(); + } + + double ratio_e = (double)edges_.size() / (double)edge_count; + double ratio_v = (double)vertices_.size() / (double)vertex_count; + + // try to edit vertices globally + if (Bernoulli(0.01)) { + UpdateGlobalVertices(); + } + + // try to edit edges globally + if (Bernoulli(0.01)) { + UpdateGlobalEdges(); + } + + // prefer adding/removing edges whenever there is an edge + // disbalance and there is enough vertices + if (ratio_v > 0.5 && std::fabs(1.0 - ratio_e) > 0.2) { + if (Bernoulli(ratio_e / 2.0)) { + RemoveEdge(); + } else { + CreateEdge(); + } + continue; + } + + // if we are near vertex balance, we can also do updates + // instad of update / deletes + if (std::fabs(1.0 - ratio_v) < 0.5 && Bernoulli(0.5)) { + AddLabel(); + continue; + } + + if (Bernoulli(ratio_v / 2.0)) { + RemoveVertex(); + } else { + CreateVertices(1); + } + } + } +}; + +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + permanent_assert(FLAGS_vertex_count > 0, + "Vertex count must be greater than 0!"); + permanent_assert(FLAGS_edge_count > 0, "Edge count must be greater than 0!"); + + LOG(INFO) << "Starting Memgraph long running test"; + + // create client + EndpointT endpoint(FLAGS_address, FLAGS_port); + SocketT socket; + if (!socket.Connect(endpoint)) { + throw utils::BasicException("Couldn't connect to server!"); + } + ClientT client(std::move(socket), FLAGS_username, FLAGS_password); + + // cleanup and create indexes + client.Execute("MATCH (n) DETACH DELETE n", {}); + for (int i = 0; i < FLAGS_worker_count; ++i) { + client.Execute(fmt::format("CREATE INDEX ON :indexed_label{}(id)", i), {}); + client.Execute(fmt::format("RETURN counterSet(\"vertex{}\", 0)", i), {}); + client.Execute(fmt::format("RETURN counterSet(\"edge{}\", 0)", i), {}); + } + + // close client + client.Close(); + + // workers + std::vector threads; + + for (int i = 0; i < FLAGS_worker_count; ++i) { + threads.push_back(std::thread([&, i]() { + GraphSession session(i); + session.Run(); + })); + } + + for (int i = 0; i < FLAGS_worker_count; ++i) { + threads[i].join(); + } + + LOG(INFO) << "All query runners done"; + + return 0; +}