From 9eb01f363f8627f17a440bfa0004e975f5f7b21b Mon Sep 17 00:00:00 2001 From: Matija Santl <matija.santl@memgraph.com> Date: Wed, 14 Feb 2018 15:20:28 +0100 Subject: [PATCH] Add sleep in remote pull operator Summary: Added a hidden flag, a int32 value, that represents the number of milliseconds the thread that pulls remote results should sleep in case when the local results are exhausted but there are still some remote results that are pending. Reviewers: teon.banek, florijan Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1198 --- src/query/plan/operator.cpp | 9 ++++++++- src/query/plan/operator.hpp | 2 ++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 316e3c29c..d4ddb9b50 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -24,6 +24,9 @@ #include "query/path.hpp" #include "utils/exceptions.hpp" +DEFINE_HIDDEN_int32(remote_pull_sleep, 2, + "Sleep between remote result pulling in milliseconds"); + // macro for the default implementation of LogicalOperator::Accept // that accepts the visitor and visits it's input_ operator #define ACCEPT_WITH_INPUT(class_name) \ @@ -2830,11 +2833,15 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { break; } - // If there are no remote results available, pull and return local + // If there are no remote results available, try to pull and return local // results. if (input_cursor_ && input_cursor_->Pull(frame, context)) { return true; } + + // If there aren't any local/remote results available, sleep. + std::this_thread::sleep_for( + std::chrono::milliseconds(FLAGS_remote_pull_sleep)); } } diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index f938d818c..1c04186a7 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -26,6 +26,8 @@ #include "utils/hashing/fnv.hpp" #include "utils/visitor.hpp" +DECLARE_int32(remote_pull_sleep); + namespace database { class GraphDbAccessor; }