From 84a6ab75cb960b0786253589ea0354a3859fc8c4 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Tue, 28 Jan 2020 10:53:24 +0100 Subject: [PATCH] Fix race condition in long running stress test Summary: The long running stress test had a subtle race condition which caused the test to fail with an error message like "Runner X edges creation failed because of: Can't serialize due to concurrent operations.". This situation was caused because some workers could complete their initialization (initial vertex and edge creation) before other workers. The workers that completed their initialization would then proceed to execute the test. In the test they could execute queries that make global updates on the graph that could interfere with the concurrently running initialization queries of other workers. This diff makes the runners wait until all initialization queries are fully executed before they execute global operations on the graph. Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2647 --- tests/stress/long_running.cpp | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/tests/stress/long_running.cpp b/tests/stress/long_running.cpp index 651928138..7be55f25c 100644 --- a/tests/stress/long_running.cpp +++ b/tests/stress/long_running.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -47,8 +48,9 @@ DEFINE_string(stats_file, "", "File into which to write statistics."); */ class GraphSession { public: - GraphSession(int id) + GraphSession(int id, std::atomic *init_complete) : id_(id), + init_complete_(init_complete), indexed_label_(fmt::format("indexed_label{}", id)), generator_{std::random_device{}()} { for (int i = 0; i < FLAGS_prop_count; ++i) { @@ -64,6 +66,7 @@ class GraphSession { private: uint64_t id_; + std::atomic *init_complete_; ClientContextT context_{FLAGS_use_ssl}; std::unique_ptr client_; @@ -341,9 +344,12 @@ class GraphSession { // initial edge creation CreateEdges(FLAGS_edge_count); - if (FLAGS_verify > 0) VerifyGraph(); + VerifyGraph(); double last_verify = timer_.Elapsed().count(); + // notify that we completed our initialization + init_complete_->fetch_add(-1, std::memory_order_acq_rel); + // run rest while (executed_queries_ < FLAGS_max_queries && timer_.Elapsed().count() / 60.0 < FLAGS_max_time) { @@ -356,8 +362,10 @@ class GraphSession { double ratio_e = (double)edges_.size() / (double)FLAGS_edge_count; double ratio_v = (double)vertices_.size() / (double)FLAGS_vertex_count; - // try to edit vertices globally - if (FLAGS_global_queries) { + // try to edit vertices globally if all workers completed their + // initialization + if (FLAGS_global_queries && + init_complete_->load(std::memory_order_acquire) == 0) { if (Bernoulli(0.01)) { UpdateGlobalVertices(); } @@ -432,8 +440,10 @@ int main(int argc, char **argv) { // sessions std::vector sessions; + std::atomic init_complete(FLAGS_worker_count); + sessions.reserve(FLAGS_worker_count); for (int i = 0; i < FLAGS_worker_count; ++i) { - sessions.emplace_back(i); + sessions.emplace_back(i, &init_complete); } // workers