Avoid usage of time.sleep (#434)

e2e python: added tooling function around `time.sleep()` that stops as soon as condition is fulfilled and will raise assert if timeout is reached
This commit is contained in:
Jeremy B 2022-07-08 10:47:18 +02:00 committed by GitHub
parent 86b1688192
commit 063e297e1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 73 additions and 48 deletions

16
tests/e2e/mg_utils.py Normal file
View File

@ -0,0 +1,16 @@
import time
def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration=20, time_between_attempt=0.05):
result = function_to_retrieve_data()
start_time = time.time()
while result != expected_value:
current_time = time.time()
duration = current_time - start_time
if duration > max_duration:
assert False, " mg_sleep_and_assert has tried for too long and did not get the expected result!"
time.sleep(time_between_attempt)
result = function_to_retrieve_data()
return result

View File

@ -12,3 +12,4 @@ copy_e2e_python_files(replication_show show.py)
copy_e2e_python_files(replication_show show_while_creating_invalid_state.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." memgraph.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." interactive_mg_runner.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." mg_utils.py)

View File

@ -15,6 +15,7 @@ import pytest
import time
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
@pytest.mark.parametrize(
@ -83,7 +84,6 @@ def test_show_replicas_while_inserting_data(connection):
# 1/
execute_and_fetch_all(cursor, "CREATE (n1:Number {name: 'forty_two', value:42});")
time.sleep(1)
# 2/
expected_data = {
@ -91,7 +91,11 @@ def test_show_replicas_while_inserting_data(connection):
("replica_2", "127.0.0.1:10002", "sync", 4, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 4, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
# 3/

View File

@ -13,9 +13,9 @@ import sys
import os
import pytest
import time
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
import interactive_mg_runner
import mgclient
import tempfile
@ -111,13 +111,15 @@ def test_show_replicas(connection):
interactive_mg_runner.stop(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_4")
# We leave some time for the main to realise the replicas are down.
time.sleep(2)
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "invalid"),
("replica_4", "127.0.0.1:10004", "async", 0, 0, "invalid"),
}
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
@ -201,7 +203,6 @@ def test_basic_recovery(connection):
# 2/
interactive_mg_runner.kill(CONFIGURATION, "main")
time.sleep(2)
# 3/
interactive_mg_runner.start(CONFIGURATION, "main")
@ -209,9 +210,10 @@ def test_basic_recovery(connection):
check_roles()
# 4/
# We leave some time for the main to recover.
time.sleep(2)
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
# 5/
@ -220,8 +222,6 @@ def test_basic_recovery(connection):
# 6/
execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic', value:42})")
interactive_mg_runner.kill(CONFIGURATION, "main")
time.sleep(2)
interactive_mg_runner.start(CONFIGURATION, "main")
cursor = connection(7687, "main").cursor()
check_roles()
@ -256,28 +256,35 @@ def test_basic_recovery(connection):
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';")
interactive_mg_runner.start(CONFIGURATION, "replica_3")
time.sleep(2)
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 6, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 6, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 6, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 6, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
def retrieve_data2():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = mg_sleep_and_assert(expected_data, retrieve_data2)
assert actual_data == expected_data
for index in (1, 2, 3, 4):
assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main
# 11/
interactive_mg_runner.kill(CONFIGURATION, "replica_1")
time.sleep(1)
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"),
("replica_2", "127.0.0.1:10002", "sync", 6, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 6, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 6, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
def retrieve_data3():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = mg_sleep_and_assert(expected_data, retrieve_data3)
assert actual_data == expected_data
# 12/
@ -383,7 +390,6 @@ def test_basic_recovery_when_replica_is_kill_when_main_is_down(connection):
# 2/
interactive_mg_runner.kill(CONFIGURATION, "main")
interactive_mg_runner.kill(CONFIGURATION, "replica_2")
time.sleep(2)
# 3/
interactive_mg_runner.start(CONFIGURATION, "main")

View File

@ -9,3 +9,5 @@ copy_streams_e2e_python_files(streams_owner_tests.py)
copy_streams_e2e_python_files(pulsar_streams_tests.py)
add_subdirectory(transformations)
copy_e2e_python_files_from_parent_folder(streams ".." mg_utils.py)

View File

@ -13,6 +13,7 @@ import mgclient
import pytest
import time
from mg_utils import mg_sleep_and_assert
from multiprocessing import Manager, Process, Value
# These are the indices of the different values in the result of SHOW STREAM
@ -427,8 +428,10 @@ def test_start_stream_with_batch_limit(connection, stream_creator, messages_send
thread_stream_running = Process(target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT))
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
def is_running():
return get_is_running(cursor, STREAM_NAME)
assert mg_sleep_and_assert(True, is_running)
messages_sender(BATCH_LIMIT - 1)
@ -438,10 +441,8 @@ def test_start_stream_with_batch_limit(connection, stream_creator, messages_send
# We send a last message to reach the batch_limit
messages_sender(1)
time.sleep(2)
# We check that the stream has correctly stoped.
assert not get_is_running(cursor, STREAM_NAME)
assert not mg_sleep_and_assert(False, is_running)
def test_start_stream_with_batch_limit_timeout(connection, stream_creator):
@ -504,8 +505,11 @@ def test_start_stream_with_batch_limit_while_check_running(
# 1/
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT))
thread_stream_check.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
def is_running():
return get_is_running(cursor, STREAM_NAME)
assert mg_sleep_and_assert(True, is_running)
with pytest.raises(mgclient.DatabaseError):
start_stream_with_limit(cursor, STREAM_NAME, BATCH_LIMIT, timeout=TIMEOUT)
@ -521,13 +525,12 @@ def test_start_stream_with_batch_limit_while_check_running(
target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT + 1, TIMEOUT)
) # Sending BATCH_LIMIT + 1 messages as BATCH_LIMIT messages have already been sent during the CHECK STREAM (and not consumed)
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
assert mg_sleep_and_assert(True, is_running)
message_sender(SIMPLE_MSG)
time.sleep(2)
assert not get_is_running(cursor, STREAM_NAME)
assert not mg_sleep_and_assert(False, is_running)
def test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender):
@ -557,8 +560,11 @@ def test_check_while_stream_with_batch_limit_running(connection, stream_creator,
)
start_time = time.time()
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
def is_running():
return get_is_running(cursor, STREAM_NAME)
assert mg_sleep_and_assert(True, is_running)
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {BATCH_LIMIT} TIMEOUT {TIMEOUT}")
@ -567,23 +573,17 @@ def test_check_while_stream_with_batch_limit_running(connection, stream_creator,
assert (end_time - start_time) < 0.8 * TIMEOUT, "The CHECK STREAM has probably thrown due to timeout!"
message_sender(SIMPLE_MSG)
time.sleep(2)
assert not get_is_running(cursor, STREAM_NAME)
assert not mg_sleep_and_assert(False, is_running)
# 2/
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT))
start_time = time.time()
thread_stream_check.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
assert mg_sleep_and_assert(True, is_running)
message_sender(SIMPLE_MSG)
time.sleep(2)
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!"
assert not get_is_running(cursor, STREAM_NAME)
assert not mg_sleep_and_assert(False, is_running)
def test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator):

View File

@ -15,6 +15,7 @@ import sys
import pytest
import mgclient
import time
from mg_utils import mg_sleep_and_assert
from multiprocessing import Process, Value
import common
@ -465,8 +466,11 @@ def test_start_stream_with_batch_limit_while_check_running(kafka_producer, kafka
def setup_function(start_check_stream, cursor, stream_name, batch_limit, timeout):
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(stream_name, batch_limit, timeout))
thread_stream_check.start()
time.sleep(2)
assert common.get_is_running(cursor, stream_name)
def is_running():
return common.get_is_running(cursor, stream_name)
assert mg_sleep_and_assert(True, is_running)
message_sender(common.SIMPLE_MSG)
thread_stream_check.join()

View File

@ -37,7 +37,6 @@ def test_simple(pulsar_client, pulsar_topics, connection, transformation):
f"CREATE PULSAR STREAM test TOPICS '{','.join(pulsar_topics)}' TRANSFORM {transformation}",
)
common.start_stream(cursor, "test")
time.sleep(5)
for topic in pulsar_topics:
producer = pulsar_client.create_producer(
@ -66,8 +65,6 @@ def test_separate_consumers(pulsar_client, pulsar_topics, connection, transforma
for stream_name in stream_names:
common.start_stream(cursor, stream_name)
time.sleep(5)
for topic in pulsar_topics:
producer = pulsar_client.create_producer(topic, send_timeout_millis=60000)
producer.send(common.SIMPLE_MSG)
@ -89,7 +86,6 @@ def test_start_from_latest_messages(pulsar_client, pulsar_topics, connection):
f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple",
)
common.start_stream(cursor, "test")
time.sleep(1)
def assert_message_not_consumed(message):
vertices_with_msg = common.execute_and_fetch_all(
@ -154,7 +150,6 @@ def test_check_stream(pulsar_client, pulsar_topics, connection, transformation):
f"CREATE PULSAR STREAM test TOPICS {pulsar_topics[0]} TRANSFORM {transformation} BATCH_SIZE {BATCH_SIZE}",
)
common.start_stream(cursor, "test")
time.sleep(1)
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
@ -308,7 +303,6 @@ def test_restart_after_error(pulsar_client, pulsar_topics, connection):
)
common.start_stream(cursor, "test_stream")
time.sleep(1)
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
@ -317,7 +311,6 @@ def test_restart_after_error(pulsar_client, pulsar_topics, connection):
assert common.timed_wait(lambda: not common.get_is_running(cursor, "test_stream"))
common.start_stream(cursor, "test_stream")
time.sleep(1)
producer.send(b"CREATE (n:VERTEX { id : 42 })")
assert common.check_one_result_row(cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n")
@ -332,7 +325,6 @@ def test_service_url(pulsar_client, pulsar_topics, connection, transformation):
f"CREATE PULSAR STREAM test TOPICS {','.join(pulsar_topics)} TRANSFORM {transformation} SERVICE_URL '{LOCAL}'",
)
common.start_stream(cursor, "test")
time.sleep(5)
for topic in pulsar_topics:
producer = pulsar_client.create_producer(