// TODO: work in progress. #include #include #include #include #include #include #include #include #include #include #include #include #include #include "io/network/utils.hpp" #include "utils/algorithm.hpp" #include "utils/timer.hpp" #include "long_running_common.hpp" using communication::bolt::Edge; using communication::bolt::Value; using communication::bolt::Vertex; struct VertexAndEdges { Vertex vertex; std::vector edges; std::vector vertices; }; const std::string INDEPENDENT_LABEL = "User"; class PokecClient : public TestClient { public: PokecClient(int id, std::vector to_remove, nlohmann::json config) : TestClient(), rg_(id), config_(config), to_remove_(to_remove) {} private: std::mt19937 rg_; nlohmann::json config_; std::vector to_remove_; std::vector removed_; auto MatchVertex(const std::string &label, int64_t id) { return Execute(fmt::format("MATCH (n :{} {{id : $id}}) RETURN n", label), {{"id", id}}); } auto MatchNeighbours(const std::string &label, int64_t id) { return Execute( fmt::format("MATCH (n :{} {{id : $id}})-[e]-(m) RETURN n, e, m", label), {{"id", id}}); } auto DetachDeleteVertex(const std::string &label, int64_t id) { return Execute( fmt::format("MATCH (n :{} {{id : $id}}) DETACH DELETE n", label), {{"id", id}}); } auto CreateVertex(const Vertex &vertex) { std::stringstream os; os << "CREATE (n :"; utils::PrintIterable(os, vertex.labels, ":"); os << " {"; utils::PrintIterable( os, vertex.properties, ", ", [&](auto &stream, const auto &pair) { if (pair.second.type() == Value::Type::String) { stream << pair.first << ": \"" << pair.second << "\""; } else { stream << pair.first << ": " << pair.second; } }); os << "})"; return Execute(os.str(), {}, "CREATE (n :labels... {...})"); } auto GetAverageAge2(int64_t id) { return Execute( "MATCH (n :User {id: $id})-[]-(m) " "RETURN AVG(n.age + m.age)", {{"id", id}}); } auto GetAverageAge3(int64_t id) { return Execute( "MATCH (n :User {id: $id})-[]-(m)-[]-(k) " "RETURN AVG(n.age + m.age + k.age)", {{"id", id}}); } auto CreateEdge(const Vertex &from, const std::string &from_label, int64_t from_id, const std::string &to_label, int64_t to_id, const Edge &edge) { std::stringstream os; os << fmt::format("MATCH (n :{} {{id : {}}}) ", from_label, from_id); os << fmt::format("MATCH (m :{} {{id : {}}}) ", to_label, to_id); os << "CREATE (n)"; if (edge.to == from.id) { os << "<-"; } else { os << "-"; } os << "[:" << edge.type << " {"; utils::PrintIterable( os, edge.properties, ", ", [&](auto &stream, const auto &pair) { if (pair.second.type() == Value::Type::String) { stream << pair.first << ": \"" << pair.second << "\""; } else { stream << pair.first << ": " << pair.second; } }); os << "}]"; if (edge.from == from.id) { os << "->"; } else { os << "-"; } os << "(m) "; os << "RETURN n.id"; auto ret = Execute(os.str(), {}, "MATCH (n :label {id: ...}) MATCH (m :label {id: ...}) " "CREATE (n)-[:type ...]-(m)"); CHECK(ret->records.size() == 1U) << "from_id: " << from_id << " " << "to_id: " << to_id << " " << "ret.records.size(): " << ret->records.size(); return ret; } VertexAndEdges RetrieveAndDeleteVertex(const std::string &label, int64_t id) { auto vertex_record = MatchVertex(label, id)->records; CHECK(vertex_record.size() == 1U) << "id: " << id << " " << "vertex_record.size(): " << vertex_record.size(); auto records = MatchNeighbours(label, id)->records; DetachDeleteVertex(label, id); std::vector edges; edges.reserve(records.size()); for (const auto &record : records) { edges.push_back(record[1].ValueEdge()); } std::vector vertices; vertices.reserve(records.size()); for (const auto &record : records) { vertices.push_back(record[2].ValueVertex()); } return {vertex_record[0][0].ValueVertex(), edges, vertices}; } void ReturnVertexAndEdges(const VertexAndEdges &vertex_and_edges, const std::string &label) { int num_queries = 0; CreateVertex(vertex_and_edges.vertex); ++num_queries; for (int i = 0; i < static_cast(vertex_and_edges.vertices.size()); ++i) { auto records = CreateEdge( vertex_and_edges.vertex, label, vertex_and_edges.vertex.properties.at("id").ValueInt(), label, vertex_and_edges.vertices[i].properties.at("id").ValueInt(), vertex_and_edges.edges[i]) ->records; CHECK(records.size() == 1U) << "Graph in invalid state " << vertex_and_edges.vertex.properties.at("id"); ++num_queries; } } public: virtual void Step() override { std::uniform_real_distribution<> real_dist(0.0, 1.0); if (real_dist(rg_) < config_["read_probability"]) { std::uniform_int_distribution<> read_query_dist(0, 1); int id = real_dist(rg_) * to_remove_.size(); switch (read_query_dist(rg_)) { case 0: GetAverageAge2(id); break; case 1: GetAverageAge3(id); break; default: LOG(FATAL) << "Should not get here"; } } else { auto remove_random = [&](auto &v) { CHECK(v.size()); std::uniform_int_distribution<> int_dist(0, v.size() - 1); std::swap(v.back(), v[int_dist(rg_)]); auto ret = v.back(); v.pop_back(); return ret; }; if (real_dist(rg_) < static_cast(removed_.size()) / (removed_.size() + to_remove_.size())) { auto vertices_and_edges = remove_random(removed_); ReturnVertexAndEdges(vertices_and_edges, INDEPENDENT_LABEL); to_remove_.push_back( vertices_and_edges.vertex.properties["id"].ValueInt()); } else { auto node_id = remove_random(to_remove_); auto ret = RetrieveAndDeleteVertex(INDEPENDENT_LABEL, node_id); removed_.push_back(ret); } } } }; int64_t NumNodes(Client &client, const std::string &label) { auto result = ExecuteNTimesTillSuccess( client, "MATCH (n :" + label + ") RETURN COUNT(n) as cnt", {}, MAX_RETRIES); return result.first.records[0][0].ValueInt(); } std::vector Neighbours(Client &client, const std::string &label, int64_t id) { auto result = ExecuteNTimesTillSuccess(client, "MATCH (n :" + label + " {id: " + std::to_string(id) + "})-[e]-(m) RETURN m.id", {}, MAX_RETRIES); std::vector ret; for (const auto &record : result.first.records) { ret.push_back(record[0].ValueInt()); } return ret; } std::vector IndependentSet(Client &client, const std::string &label) { const int64_t num_nodes = NumNodes(client, label); std::vector independent_nodes_ids; std::vector ids; std::unordered_set independent; for (int64_t i = 1; i <= num_nodes; ++i) { ids.push_back(i); independent.insert(i); } { std::mt19937 mt; std::shuffle(ids.begin(), ids.end(), mt); } for (auto i : ids) { if (independent.find(i) == independent.end()) continue; independent.erase(i); std::vector neighbour_ids = Neighbours(client, label, i); independent_nodes_ids.push_back(i); for (auto j : neighbour_ids) { independent.erase(j); } } LOG(INFO) << "Number of nodes: " << num_nodes << "\n" << "Number of independent nodes: " << independent_nodes_ids.size(); return independent_nodes_ids; } int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); communication::Init(); nlohmann::json config; std::cin >> config; auto independent_nodes_ids = [&] { Endpoint endpoint(io::network::ResolveHostname(FLAGS_address), FLAGS_port); ClientContext context(FLAGS_use_ssl); Client client(&context); if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) { LOG(FATAL) << "Couldn't connect to " << endpoint; } return IndependentSet(client, INDEPENDENT_LABEL); }(); int64_t next_to_assign = 0; std::vector> clients; clients.reserve(FLAGS_num_workers); for (int i = 0; i < FLAGS_num_workers; ++i) { int64_t size = independent_nodes_ids.size(); int64_t next_next_to_assign = next_to_assign + size / FLAGS_num_workers + (i < size % FLAGS_num_workers); std::vector to_remove( independent_nodes_ids.begin() + next_to_assign, independent_nodes_ids.begin() + next_next_to_assign); LOG(INFO) << next_to_assign << " " << next_next_to_assign; next_to_assign = next_next_to_assign; clients.emplace_back(std::make_unique(i, to_remove, config)); } RunMultithreadedTest(clients); return 0; }