mirror of
https://github.com/mirror/wget.git
synced 2025-01-28 05:10:35 +08:00
PEP8'ify the Python Test Suite
* testenv/conf/{__init__,authentication,files_crawled, hook_sample,reject_header,server_files}.py: Aesthetic changes to meet Python PEP8 guidelines * testenv/exc/{server_error,test_failed}.py: Same * testenv/misc/{colour_terminal,wget_file}.py: Same * testenv/server/http/http_server.py: Same * testenv/test/base_test.py: Same
This commit is contained in:
parent
c6af2fddee
commit
8e0dd0d870
@ -3,6 +3,7 @@ import os
|
||||
# this file implements the mechanism of conf class auto-registration,
|
||||
# don't modify this file if you have no idea what you're doing
|
||||
|
||||
|
||||
def gen_hook():
|
||||
hook_table = {}
|
||||
|
||||
|
@ -6,5 +6,8 @@ class ServerError (Exception):
|
||||
def __init__(self, err_message):
|
||||
self.err_message = err_message
|
||||
|
||||
|
||||
class AuthError (ServerError):
|
||||
""" A custom exception raised byt he servers when authentication of the
|
||||
request fails. """
|
||||
pass
|
||||
|
@ -29,6 +29,7 @@ T_COLORS = {
|
||||
system = True if platform.system() in ('Linux', 'Darwin') else False
|
||||
check = False if getenv("MAKE_CHECK") == 'True' else True
|
||||
|
||||
|
||||
def printer(color, string):
|
||||
if sys.stdout.isatty() and system and check:
|
||||
print(T_COLORS.get(color) + string + T_COLORS.get('ENDC'))
|
||||
|
@ -37,11 +37,16 @@ class HTTPSServer (StoppableHTTPServer):
|
||||
def __init__(self, address, handler):
|
||||
import ssl
|
||||
BaseServer.__init__(self, address, handler)
|
||||
# step one up because test suite change directory away from $srcdir (don't do that !!!)
|
||||
CERTFILE = os.path.abspath(os.path.join('..', os.getenv('srcdir', '.'), 'certs', 'server-cert.pem'))
|
||||
KEYFILE = os.path.abspath(os.path.join('..', os.getenv('srcdir', '.'), 'certs', 'server-key.pem'))
|
||||
fop = open (CERTFILE)
|
||||
print (fop.readline())
|
||||
# step one up because test suite change directory away from $srcdir
|
||||
# (don't do that !!!)
|
||||
CERTFILE = os.path.abspath(os.path.join('..',
|
||||
os.getenv('srcdir', '.'),
|
||||
'certs',
|
||||
'server-cert.pem'))
|
||||
KEYFILE = os.path.abspath(os.path.join('..',
|
||||
os.getenv('srcdir', '.'),
|
||||
'certs',
|
||||
'server-key.pem'))
|
||||
self.socket = ssl.wrap_socket(
|
||||
sock=socket.socket(self.address_family, self.socket_type),
|
||||
ssl_version=ssl.PROTOCOL_TLSv1,
|
||||
@ -284,7 +289,8 @@ class _Handler (BaseHTTPRequestHandler):
|
||||
return True if resp == params['response'] else False
|
||||
|
||||
def authorize_digest(self, auth_header, auth_rule):
|
||||
if auth_header is None or auth_header.split(' ')[0].lower() != 'digest':
|
||||
if auth_header is None or \
|
||||
auth_header.split(' ')[0].lower() != 'digest':
|
||||
return False
|
||||
else:
|
||||
self.user = auth_rule.auth_user
|
||||
@ -321,36 +327,41 @@ class _Handler (BaseHTTPRequestHandler):
|
||||
auth_header = self.headers.get("Authorization")
|
||||
required_auth = auth_rule.auth_type.lower()
|
||||
if required_auth == "both" or required_auth == "both_inline":
|
||||
auth_type = auth_header.split(' ')[0].lower() if auth_header else required_auth
|
||||
if auth_header:
|
||||
auth_type = auth_header.split(' ')[0].lower()
|
||||
else:
|
||||
auth_type = required_auth
|
||||
else:
|
||||
auth_type = required_auth
|
||||
try:
|
||||
assert hasattr(self, "authorize_" + auth_type)
|
||||
is_auth = getattr (self, "authorize_" + auth_type) (auth_header, auth_rule)
|
||||
is_auth = getattr(self, "authorize_" + auth_type)(auth_header,
|
||||
auth_rule)
|
||||
except AssertionError:
|
||||
raise AuthError ("Authentication Mechanism " + auth_type + " not supported")
|
||||
raise AuthError("Authentication Mechanism %s not supported" %
|
||||
auth_type)
|
||||
except AttributeError as ae:
|
||||
raise AuthError(ae.__str__())
|
||||
if is_auth is False:
|
||||
raise AuthError("Unable to Authenticate")
|
||||
|
||||
|
||||
def ExpectHeader(self, header_obj):
|
||||
exp_headers = header_obj.headers
|
||||
for header_line in exp_headers:
|
||||
header_recd = self.headers.get(header_line)
|
||||
if header_recd is None or header_recd != exp_headers[header_line]:
|
||||
self.send_error (400, "Expected Header " + header_line + " not found")
|
||||
self.send_error(400, "Expected Header %s not found" %
|
||||
header_line)
|
||||
self.finish_headers()
|
||||
raise ServerError("Header " + header_line + " not found")
|
||||
|
||||
|
||||
def RejectHeader(self, header_obj):
|
||||
rej_headers = header_obj.headers
|
||||
for header_line in rej_headers:
|
||||
header_recd = self.headers.get(header_line)
|
||||
if header_recd is not None and header_recd == rej_headers[header_line]:
|
||||
self.send_error (400, 'Blacklisted Header ' + header_line + ' received')
|
||||
if not header_recd and header_recd == rej_headers[header_line]:
|
||||
self.send_error(400, 'Blacklisted Header %s received' %
|
||||
header_line)
|
||||
self.finish_headers()
|
||||
raise ServerError("Header " + header_line + ' received')
|
||||
|
||||
@ -434,6 +445,7 @@ class _Handler (BaseHTTPRequestHandler):
|
||||
}
|
||||
return extension_map.get(ext, "text/plain")
|
||||
|
||||
|
||||
class HTTPd(threading.Thread):
|
||||
server_class = StoppableHTTPServer
|
||||
handler = _Handler
|
||||
@ -456,4 +468,4 @@ class HTTPSd (HTTPd):
|
||||
|
||||
server_class = HTTPSServer
|
||||
|
||||
# vim: set ts=4 sts=4 sw=4 tw=80 et :
|
||||
# vim: set ts=4 sts=4 sw=4 tw=79 et :
|
||||
|
@ -28,9 +28,10 @@ class BaseTest:
|
||||
Attributes should not be defined outside __init__.
|
||||
"""
|
||||
self.name = name
|
||||
self.pre_configs = pre_hook or {} # if pre_hook == None, then
|
||||
# {} (an empty dict object) is
|
||||
# passed to self.pre_configs
|
||||
# if pre_hook == None, then {} (an empty dict object) is passed to
|
||||
# self.pre_configs
|
||||
self.pre_configs = pre_hook or {}
|
||||
|
||||
self.test_params = test_params or {}
|
||||
self.post_configs = post_hook or {}
|
||||
self.protocols = protocols
|
||||
@ -109,9 +110,14 @@ class BaseTest:
|
||||
if gdb == "1":
|
||||
cmd_line = 'gdb --args %s %s ' % (wget_path, wget_options)
|
||||
elif valgrind == "1":
|
||||
cmd_line = 'valgrind --error-exitcode=301 --leak-check=yes --track-origins=yes %s %s ' % (wget_path, wget_options)
|
||||
cmd_line = 'valgrind --error-exitcode=301 ' \
|
||||
'--leak-check=yes ' \
|
||||
'--track-origins=yes ' \
|
||||
'%s %s ' % (wget_path, wget_options)
|
||||
elif valgrind not in ("", "0"):
|
||||
cmd_line = '%s %s %s ' % (os.getenv("VALGRIND_TESTS", ""), wget_path, wget_options)
|
||||
cmd_line = '%s %s %s ' % (os.getenv("VALGRIND_TESTS", ""),
|
||||
wget_path,
|
||||
wget_options)
|
||||
else:
|
||||
cmd_line = '%s %s ' % (wget_path, wget_options)
|
||||
|
||||
@ -183,7 +189,8 @@ class BaseTest:
|
||||
|
||||
def _replace_substring(self, string):
|
||||
"""
|
||||
Replace first occurrence of "{{name}}" in @string with "getattr(self, name)".
|
||||
Replace first occurrence of "{{name}}" in @string with
|
||||
"getattr(self, name)".
|
||||
"""
|
||||
pattern = re.compile(r'\{\{\w+\}\}')
|
||||
match_obj = pattern.search(string)
|
||||
|
Loading…
Reference in New Issue
Block a user