Fix race condition in long running stress test

Summary:
The long running stress test had a subtle race condition which caused the test
to fail with an error message like "Runner X edges creation failed because of:
Can't serialize due to concurrent operations.". This situation was caused
because some workers could complete their initialization (initial vertex and
edge creation) before other workers. The workers that completed their
initialization would then proceed to execute the test. In the test they could
execute queries that make global updates on the graph that could interfere with
the concurrently running initialization queries of other workers.

This diff makes the runners wait until all initialization queries are fully
executed before they execute global operations on the graph.

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2647
This commit is contained in:
Matej Ferencevic 2020-01-28 10:53:24 +01:00
parent fd87facf65
commit 84a6ab75cb

View File

@ -1,3 +1,4 @@
#include <atomic>
#include <fstream>
#include <random>
#include <set>
@ -47,8 +48,9 @@ DEFINE_string(stats_file, "", "File into which to write statistics.");
*/
class GraphSession {
public:
GraphSession(int id)
GraphSession(int id, std::atomic<uint64_t> *init_complete)
: id_(id),
init_complete_(init_complete),
indexed_label_(fmt::format("indexed_label{}", id)),
generator_{std::random_device{}()} {
for (int i = 0; i < FLAGS_prop_count; ++i) {
@ -64,6 +66,7 @@ class GraphSession {
private:
uint64_t id_;
std::atomic<uint64_t> *init_complete_;
ClientContextT context_{FLAGS_use_ssl};
std::unique_ptr<ClientT> client_;
@ -341,9 +344,12 @@ class GraphSession {
// initial edge creation
CreateEdges(FLAGS_edge_count);
if (FLAGS_verify > 0) VerifyGraph();
VerifyGraph();
double last_verify = timer_.Elapsed().count();
// notify that we completed our initialization
init_complete_->fetch_add(-1, std::memory_order_acq_rel);
// run rest
while (executed_queries_ < FLAGS_max_queries &&
timer_.Elapsed().count() / 60.0 < FLAGS_max_time) {
@ -356,8 +362,10 @@ class GraphSession {
double ratio_e = (double)edges_.size() / (double)FLAGS_edge_count;
double ratio_v = (double)vertices_.size() / (double)FLAGS_vertex_count;
// try to edit vertices globally
if (FLAGS_global_queries) {
// try to edit vertices globally if all workers completed their
// initialization
if (FLAGS_global_queries &&
init_complete_->load(std::memory_order_acquire) == 0) {
if (Bernoulli(0.01)) {
UpdateGlobalVertices();
}
@ -432,8 +440,10 @@ int main(int argc, char **argv) {
// sessions
std::vector<GraphSession> sessions;
std::atomic<uint64_t> init_complete(FLAGS_worker_count);
sessions.reserve(FLAGS_worker_count);
for (int i = 0; i < FLAGS_worker_count; ++i) {
sessions.emplace_back(i);
sessions.emplace_back(i, &init_complete);
}
// workers