Add test for ensuring that timeouts in Receive are generally happening as-expected
This commit is contained in:
parent
36432ce6d0
commit
d2f5235168
@ -46,17 +46,14 @@ class ThriftHandle {
|
||||
// TODO(gabor) The RSM map should not be a part of this class.
|
||||
// std::map<boost::uuids::uuid, uint16_t /*this should be the actual RSM*/> rsm_map_;
|
||||
|
||||
// this is duplicated between the ThriftTransport and here
|
||||
// because it's relatively simple and there's no need to
|
||||
// avoid the duplication as of the time of implementation.
|
||||
public:
|
||||
ThriftHandle(Address our_address) : address_(our_address) {}
|
||||
|
||||
Time Now() const {
|
||||
auto nano_time = std::chrono::system_clock::now();
|
||||
return std::chrono::time_point_cast<std::chrono::microseconds>(nano_time);
|
||||
}
|
||||
|
||||
public:
|
||||
ThriftHandle(Address our_address) : address_(our_address) {}
|
||||
|
||||
template <Message M>
|
||||
void DeliverMessage(Address to_address, Address from_address, RequestId request_id, M &&message) {
|
||||
std::any message_any(std::move(message));
|
||||
@ -114,8 +111,23 @@ class ThriftHandle {
|
||||
// TODO(tyler) block for the specified duration on the Inbox's receipt of a message of this type.
|
||||
std::unique_lock lock(mu_);
|
||||
|
||||
Time before = Now();
|
||||
|
||||
while (can_receive_.empty()) {
|
||||
std::cv_status cv_status_value = cv_.wait_for(lock, timeout);
|
||||
Time now = Now();
|
||||
|
||||
// protection against non-monotonic timesources
|
||||
auto maxed_now = std::max(now, before);
|
||||
auto elapsed = maxed_now - before;
|
||||
|
||||
if (timeout < elapsed) {
|
||||
return TimedOut{};
|
||||
}
|
||||
|
||||
Duration relative_timeout = timeout - elapsed;
|
||||
|
||||
std::cv_status cv_status_value = cv_.wait_for(lock, relative_timeout);
|
||||
|
||||
if (cv_status_value == std::cv_status::timeout) {
|
||||
return TimedOut{};
|
||||
}
|
||||
|
@ -54,10 +54,7 @@ class ThriftTransport {
|
||||
return thrift_handle_->template Send<M>(address, address_, request_id, message);
|
||||
}
|
||||
|
||||
Time Now() const {
|
||||
auto nano_time = std::chrono::system_clock::now();
|
||||
return std::chrono::time_point_cast<std::chrono::microseconds>(nano_time);
|
||||
}
|
||||
Time Now() const { return thrift_handle_->Now(); }
|
||||
|
||||
bool ShouldShutDown() const { return false; }
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <chrono>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
@ -26,6 +27,7 @@ using memgraph::io::FuturePromisePair;
|
||||
using memgraph::io::RequestEnvelope;
|
||||
using memgraph::io::ResponseEnvelope;
|
||||
using memgraph::io::ResponseResult;
|
||||
using memgraph::io::Time;
|
||||
using memgraph::io::thrift::ThriftHandle;
|
||||
|
||||
struct TestMessage {
|
||||
@ -37,8 +39,19 @@ TEST(Thrift, ThriftHandleTimeout) {
|
||||
auto handle = ThriftHandle{our_address};
|
||||
|
||||
// assert timeouts fire
|
||||
auto should_timeout = handle.Receive<TestMessage>(Duration{});
|
||||
MG_ASSERT(should_timeout.HasError());
|
||||
Duration zero_timeout = Duration{};
|
||||
auto should_timeout_1 = handle.Receive<TestMessage>(zero_timeout);
|
||||
MG_ASSERT(should_timeout_1.HasError());
|
||||
|
||||
Duration ten_ms = std::chrono::microseconds{10000};
|
||||
Time before = handle.Now();
|
||||
|
||||
auto should_timeout_2 = handle.Receive<TestMessage>(ten_ms);
|
||||
MG_ASSERT(should_timeout_2.HasError());
|
||||
|
||||
Time after = handle.Now();
|
||||
|
||||
MG_ASSERT(after - before >= ten_ms);
|
||||
}
|
||||
|
||||
TEST(Thrift, ThriftHandleReceive) {
|
||||
|
Loading…
Reference in New Issue
Block a user