Introducing Python based Test Environment

This commit is contained in:
Darshit Shah 2013-08-31 16:14:29 +05:30
parent eaf2fc9a4f
commit dccc154a0e
21 changed files with 1740 additions and 2 deletions

View File

@ -1,3 +1,8 @@
2013-08-31 Darshit Shah <darnir@gmail.com>
* configure.ac: Add testenv/Makefile to AC_CONFIG_FILES.
* Makefile.am: Add testenv to SUBDIRS
2013-07-23 Tim Ruehsen <tim.ruehsen@gmx.de>
* configure.ac: Remove AM_CONDITIONAL HAVE_NETTLE.

View File

@ -41,7 +41,7 @@ distuninstallcheck_listfiles = find . -type f | \
ACLOCAL_AMFLAGS = -I m4
# subdirectories in the distribution
SUBDIRS = lib src doc po tests util
SUBDIRS = lib src doc po tests util testenv
EXTRA_DIST = ChangeLog.README MAILING-LIST \
msdos/ChangeLog msdos/config.h msdos/Makefile.DJ \

View File

@ -569,6 +569,6 @@ dnl Create output
dnl
AC_CONFIG_FILES([Makefile src/Makefile doc/Makefile util/Makefile
po/Makefile.in tests/Makefile tests/WgetTest.pm
lib/Makefile])
lib/Makefile testenv/Makefile])
AC_CONFIG_HEADERS([src/config.h])
AC_OUTPUT

28
testenv/ChangeLog Normal file
View File

@ -0,0 +1,28 @@
2013-08-31 Darshit Shah <darnir@gmail.com>
* README: (newfile) Simple help / instructions about using the Test
Environment.
* Makefile.am: (newfile) Makefile for the Test Environment. Uses the
Automake Parallel Test Harness
* WgetTest.py: (newfile) Base module that executes the Test.
* HTTPServer.py: (newfile) Contains the custom HTTP Server for the
Test Environment. Creates an instance of http.server in Python3.
* FTPServer.py: (newfile) Overrides methods from pyftpdlib for use in
the Test Environment. ** Work under progress **.
* ColourTerm.py: (newfile) A custom module to output coloured text to
the terminal. Known to work on POSIX shells.
* Test-Proto.py: (newfile) A prototype Test File. This should be
copied when writing a new Test Case.
* Test-Content-disposition-2.py: Test Content Disposition clobbering
* Test-Content-disposition.py: Test Content Disposition Headers
* Test-O.py: Test Output filename command
* Test-auth-basic-fail.py: Test returncode on auth failure
* Test-auth-basic.py: Test Basic Auth negotiation
* Test-auth-both.py: Test handling of Multiple auth providers. This
test currently fails.
* Test-auth-digest.py: Test Digest Auth Negotiation
* Test-auth-no-challenge-url.py: Ensure --auth-no-challenge is handled
when auth details are in-URL.
* Test-auth-no-challenge.py: Ensure --auth-no-challenge is honoured
* Test-auth-retcode.py: Ensure correct return code after 403 Forbidden
response.

23
testenv/ColourTerm.py Normal file
View File

@ -0,0 +1,23 @@
import platform
from os import getenv
T_COLORS = {
'PURPLE' : '\033[95m',
'BLUE' : '\033[94m',
'GREEN' : '\033[92m',
'YELLOW' : '\033[93m',
'RED' : '\033[91m',
'ENDC' : '\033[0m'
}
def printer (color, string):
if platform.system () == 'Linux':
if getenv ("MAKE_CHECK", "False") == "True":
print (string)
else:
print (T_COLORS.get (color) + string + T_COLORS.get ('ENDC'))
else:
print (string)
# vim: set ts=8 sw=3 tw=0 et :

162
testenv/FTPServer.py Normal file
View File

@ -0,0 +1,162 @@
import os
import re
import threading
import socket
import pyftpdlib.__main__
from pyftpdlib.ioloop import IOLoop
import pyftpdlib.handlers as Handle
from pyftpdlib.servers import FTPServer
from pyftpdlib.authorizers import DummyAuthorizer
from pyftpdlib._compat import PY3, u, b, getcwdu, callable
class FTPDHandler (Handle.FTPHandler):
def ftp_LIST (self, path):
try:
iterator = self.run_as_current_user(self.fs.get_list_dir, path)
except (OSError, FilesystemError):
err = sys.exc_info()[1]
why = _strerror (err)
self.respond ('550 %s. ' % why)
else:
if self.isRule ("Bad List") is True:
iter_list = list ()
for flist in iterator:
line = re.compile (r'(\s+)').split (flist.decode ('utf-8'))
line[8] = '0'
iter_l = ''.join (line).encode ('utf-8')
iter_list.append (iter_l)
iterator = (n for n in iter_list)
producer = Handle.BufferedIteratorProducer (iterator)
self.push_dtp_data (producer, isproducer=True, cmd="LIST")
return path
def ftp_PASV (self, line):
if self._epsvall:
self.respond ("501 PASV not allowed after EPSV ALL.")
return
self._make_epasv(extmode=False)
if self.isRule ("FailPASV") is True:
del self.server.global_rules["FailPASV"]
self.socket.close ()
def isRule (self, rule):
rule_obj = self.server.global_rules[rule]
return False if not rule_obj else rule_obj[0]
class FTPDServer (FTPServer):
def set_global_rules (self, rules):
self.global_rules = rules
class FTPd(threading.Thread):
"""A threaded FTP server used for running tests.
This is basically a modified version of the FTPServer class which
wraps the polling loop into a thread.
The instance returned can be used to start(), stop() and
eventually re-start() the server.
"""
handler = FTPDHandler
server_class = FTPDServer
def __init__(self, addr=None):
os.mkdir ('server')
os.chdir ('server')
try:
HOST = socket.gethostbyname ('localhost')
except socket.error:
HOST = 'localhost'
USER = 'user'
PASSWD = '12345'
HOME = getcwdu ()
threading.Thread.__init__(self)
self.__serving = False
self.__stopped = False
self.__lock = threading.Lock()
self.__flag = threading.Event()
if addr is None:
addr = (HOST, 0)
authorizer = DummyAuthorizer()
authorizer.add_user(USER, PASSWD, HOME, perm='elradfmwM') # full perms
authorizer.add_anonymous(HOME)
self.handler.authorizer = authorizer
# lowering buffer sizes = more cycles to transfer data
# = less false positive test failures
self.handler.dtp_handler.ac_in_buffer_size = 32768
self.handler.dtp_handler.ac_out_buffer_size = 32768
self.server = self.server_class(addr, self.handler)
self.host, self.port = self.server.socket.getsockname()[:2]
os.chdir ('..')
def set_global_rules (self, rules):
self.server.set_global_rules (rules)
def __repr__(self):
status = [self.__class__.__module__ + "." + self.__class__.__name__]
if self.__serving:
status.append('active')
else:
status.append('inactive')
status.append('%s:%s' % self.server.socket.getsockname()[:2])
return '<%s at %#x>' % (' '.join(status), id(self))
@property
def running(self):
return self.__serving
def start(self, timeout=0.001):
"""Start serving until an explicit stop() request.
Polls for shutdown every 'timeout' seconds.
"""
if self.__serving:
raise RuntimeError("Server already started")
if self.__stopped:
# ensure the server can be started again
FTPd.__init__(self, self.server.socket.getsockname(), self.handler)
self.__timeout = timeout
threading.Thread.start(self)
self.__flag.wait()
def run(self):
self.__serving = True
self.__flag.set()
while self.__serving:
self.__lock.acquire()
self.server.serve_forever(timeout=self.__timeout, blocking=False)
self.__lock.release()
self.server.close_all()
def stop(self):
"""Stop serving (also disconnecting all currently connected
clients) by telling the serve_forever() loop to stop and
waits until it does.
"""
if not self.__serving:
raise RuntimeError("Server not started yet")
self.__serving = False
self.__stopped = True
self.join()
def mk_file_sys (file_list):
os.chdir ('server')
for name, content in file_list.items ():
file_h = open (name, 'w')
file_h.write (content)
file_h.close ()
os.chdir ('..')
def filesys ():
fileSys = dict ()
os.chdir ('server')
for parent, dirs, files in os.walk ('.'):
for filename in files:
file_handle = open (filename, 'r')
file_content = file_handle.read ()
fileSys[filename] = file_content
os.chdir ('..')
return fileSys

377
testenv/HTTPServer.py Normal file
View File

@ -0,0 +1,377 @@
from multiprocessing import Process, Queue
from http.server import HTTPServer, BaseHTTPRequestHandler
from base64 import b64encode
from random import random
from hashlib import md5
import os
import re
class InvalidRangeHeader (Exception):
""" Create an Exception for handling of invalid Range Headers. """
# Maybe: This exception should be generalized to handle other issues too.
def __init__ (self, err_message):
self.err_message = err_message
class StoppableHTTPServer (HTTPServer):
""" Define methods for configuring the Server. """
def server_conf (self, filelist, conf_dict):
""" Set Server Rules and File System for this instance.
This method should be called before the Server is forked into a new
process. This is because of how the system-level fork() call works.
"""
self.server_configs = conf_dict
global fileSys
fileSys = filelist
def serve_forever (self, q):
""" Override method allowing for programmatical shutdown process. """
global queue
queue = q
self.stop = False
while not self.stop:
self.handle_request ()
class WgetHTTPRequestHandler (BaseHTTPRequestHandler):
""" Define methods for handling Test Checks. """
# List of Checks that are run on the Server-side.
tests = [
"expect_headers",
"reject_headers",
"is_authorized",
"custom_response",
"test_cookies"
]
def test_cookies (self):
cookie_recd = self.headers.get ('Cookie')
cookies = self.get_rule_list ('Cookie')
cookie_exp = cookies[0].cookie_value if cookies else None
if cookie_exp == cookie_recd:
return True
else:
self.send_response (400, "Cookie Mismatch")
self.finish_headers ()
return False
def get_rule_list (self, name):
r_list = self.rules.get (name) if name in self.rules else list ()
return r_list
def do_QUIT (self):
queue.put (fileSys)
self.send_response (200)
self.end_headers ()
self.server.stop = True
class __Handler (WgetHTTPRequestHandler):
""" Define Handler Methods for different Requests. """
InvalidRangeHeader = InvalidRangeHeader
protocol_version = 'HTTP/1.1'
""" Define functions for various HTTP Requests. """
def do_HEAD (self):
self.send_head ()
def do_GET (self):
content, start = self.send_head ()
if content:
if start is None:
self.wfile.write (content.encode ('utf-8'))
else:
self.wfile.write (content.encode ('utf-8')[start:])
def do_POST (self):
path = self.path[1:]
self.rules = self.server.server_configs.get (path)
if not self.custom_response ():
return (None, None)
if path in fileSys:
body_data = self.get_body_data ()
self.send_response (200)
self.send_header ("Content-type", "text/plain")
content = fileSys.pop (path) + "\n" + body_data
total_length = len (content)
fileSys[path] = content
self.send_header ("Content-Length", total_length)
self.finish_headers ()
try:
self.wfile.write (content.encode ('utf-8'))
except Exception:
pass
else:
self.send_put (path)
def do_PUT (self):
path = self.path[1:]
self.rules = self.server.server_configs.get (path)
if not self.custom_response ():
return (None, None)
fileSys.pop (path, None)
self.send_put (path)
""" End of HTTP Request Method Handlers. """
""" Helper functions for the Handlers. """
def parse_range_header (self, header_line, length):
if header_line is None:
return None
if not header_line.startswith ("bytes="):
raise InvalidRangeHeader ("Cannot parse header Range: %s" %
(header_line))
regex = re.match (r"^bytes=(\d*)\-$", header_line)
range_start = int (regex.group (1))
if range_start >= length:
raise InvalidRangeHeader ("Range Overflow")
return range_start
def get_body_data (self):
cLength_header = self.headers.get ("Content-Length")
cLength = int (cLength_header) if cLength_header is not None else 0
body_data = self.rfile.read (cLength).decode ('utf-8')
return body_data
def send_put (self, path):
body_data = self.get_body_data ()
self.send_response (201)
fileSys[path] = body_data
self.send_header ("Content-type", "text/plain")
self.send_header ("Content-Length", len (body_data))
self.finish_headers ()
try:
self.wfile.write (body_data.encode ('utf-8'))
except Exception:
pass
def send_cust_headers (self):
header_obj_list = self.get_rule_list ('SendHeader')
if header_obj_list:
header_obj = header_obj_list[0]
for header in header_obj.headers:
self.send_header (header, header_obj.headers[header])
def finish_headers (self):
self.send_cust_headers ()
self.end_headers ()
def custom_response (self):
codes = self.get_rule_list ('Response')
if codes:
self.send_response (codes[0].response_code)
self.finish_headers ()
return False
else:
return True
def base64 (self, data):
string = b64encode (data.encode ('utf-8'))
return string.decode ('utf-8')
def send_challenge (self, auth_type):
if auth_type == "Both":
self.send_challenge ("Digest")
self.send_challenge ("Basic")
return
if auth_type == "Basic":
challenge_str = 'Basic realm="Wget-Test"'
elif auth_type == "Digest" or auth_type == "Both_inline":
self.nonce = md5 (str (random ()).encode ('utf-8')).hexdigest ()
self.opaque = md5 (str (random ()).encode ('utf-8')).hexdigest ()
challenge_str = 'Digest realm="Test", nonce="%s", opaque="%s"' %(
self.nonce,
self.opaque)
challenge_str += ', qop="auth"'
if auth_type == "Both_inline":
challenge_str = 'Basic realm="Wget-Test", ' + challenge_str
self.send_header ("WWW-Authenticate", challenge_str)
def authorize_Basic (self, auth_header, auth_rule):
if auth_header is None or auth_header.split(' ')[0] != 'Basic':
return False
else:
self.user = auth_rule.auth_user
self.passw = auth_rule.auth_pass
auth_str = "Basic " + self.base64 (self.user + ":" + self.passw)
return True if auth_str == auth_header else False
def parse_auth_header (self, auth_header):
n = len("Digest ")
auth_header = auth_header[n:].strip()
items = auth_header.split(", ")
key_values = [i.split("=", 1) for i in items]
key_values = [(k.strip(), v.strip().replace('"', '')) for k, v in key_values]
return dict(key_values)
def KD (self, secret, data):
return self.H (secret + ":" + data)
def H (self, data):
return md5 (data.encode ('utf-8')).hexdigest ()
def A1 (self):
return "%s:%s:%s" % (self.user, "Test", self.passw)
def A2 (self, params):
return "%s:%s" % (self.command, params["uri"])
def check_response (self, params):
if "qop" in params:
data_str = params['nonce'] \
+ ":" + params['nc'] \
+ ":" + params['cnonce'] \
+ ":" + params['qop'] \
+ ":" + self.H (self.A2 (params))
else:
data_str = params['nonce'] + ":" + self.H (self.A2 (params))
resp = self.KD (self.H (self.A1 ()), data_str)
return True if resp == params['response'] else False
def authorize_Digest (self, auth_header, auth_rule):
if auth_header is None or auth_header.split(' ')[0] != 'Digest':
return False
else:
self.user = auth_rule.auth_user
self.passw = auth_rule.auth_pass
params = self.parse_auth_header (auth_header)
pass_auth = True
if self.user != params['username'] or \
self.nonce != params['nonce'] or self.opaque != params['opaque']:
pass_auth = False
req_attribs = ['username', 'realm', 'nonce', 'uri', 'response']
for attrib in req_attribs:
if not attrib in params:
pass_auth = False
if not self.check_response (params):
pass_auth = False
return pass_auth
def authorize_Both (self, auth_header, auth_rule):
return False
def authorize_Both_inline (self, auth_header, auth_rule):
return False
def is_authorized (self):
is_auth = True
auth_rule = self.get_rule_list ('Authentication')
if auth_rule:
auth_header = self.headers.get ("Authorization")
req_auth = auth_rule[0].auth_type
if req_auth == "Both" or req_auth == "Both_inline":
auth_type = auth_header.split(' ')[0] if auth_header else req_auth
else:
auth_type = req_auth
assert hasattr (self, "authorize_" + auth_type)
is_auth = getattr (self, "authorize_" + auth_type) (auth_header, auth_rule[0])
if is_auth is False:
self.send_response (401)
self.send_challenge (auth_type)
self.finish_headers ()
return is_auth
def expect_headers (self):
""" This is modified code to handle a few changes. Should be removed ASAP """
exp_headers_obj = self.get_rule_list ('ExpectHeader')
if exp_headers_obj:
exp_headers = exp_headers_obj[0].headers
for header_line in exp_headers:
header_re = self.headers.get (header_line)
if header_re is None or header_re != exp_headers[header_line]:
self.send_error (400, 'Expected Header not Found')
self.end_headers ()
return False
return True
def reject_headers (self):
rej_headers_list = self.get_rule_list ("RejectHeader")
if rej_headers_list:
rej_headers = rej_headers_list[0].headers
for header_line in rej_headers:
header_re = self.headers.get (header_line)
if header_re is not None and header_re == rej_headers[header_line]:
self.send_error (400, 'Blacklisted Header was Sent')
self.end_headers ()
return False
return True
def send_head (self):
""" Common code for GET and HEAD Commands.
This method is overriden to use the fileSys dict.
"""
path = self.path[1:]
if path in fileSys:
self.rules = self.server.server_configs.get (path)
testPassed = True
for check in self.tests:
if testPassed is True:
assert hasattr (self, check)
testPassed = getattr (self, check) ()
else:
return (None, None)
content = fileSys.get (path)
content_length = len (content)
try:
self.range_begin = self.parse_range_header (
self.headers.get ("Range"), content_length)
except InvalidRangeHeader as ae:
# self.log_error("%s", ae.err_message)
if ae.err_message == "Range Overflow":
self.send_response (416)
self.finish_headers ()
return (None, None)
else:
self.range_begin = None
if self.range_begin is None:
self.send_response (200)
else:
self.send_response (206)
self.send_header ("Accept-Ranges", "bytes")
self.send_header ("Content-Range",
"bytes %d-%d/%d" % (self.range_begin,
content_length - 1,
content_length))
content_length -= self.range_begin
self.send_header ("Content-type", "text/plain")
self.send_header ("Content-Length", content_length)
self.finish_headers ()
return (content, self.range_begin)
else:
self.send_error (404, "Not Found")
return (None, None)
def create_server ():
server = StoppableHTTPServer (("localhost", 0), __Handler)
return server
def spawn_server (server):
global q
q = Queue()
server_process = Process (target=server.serve_forever, args=(q,))
server_process.start ()
def ret_fileSys ():
return (q.get (True))
# vim: set ts=8 sts=4 sw=3 tw=0 et :

42
testenv/Makefile.am Normal file
View File

@ -0,0 +1,42 @@
# Makefile for `wget' utility
# Copyright (C) 2013 Free Software Foundation, Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Wget. If not, see <http://www.gnu.org/licenses/>.
# Additional permission under GNU GPL version 3 section 7
# If you modify this program, or any covered work, by linking or
# combining it with the OpenSSL project's OpenSSL library (or a
# modified version of that library), containing parts covered by the
# terms of the OpenSSL or SSLeay licenses, the Free Software Foundation
# grants you additional permission to convey the resulting work.
# Corresponding Source for a non-source form of such a combination
# shall include the source code for the parts of OpenSSL used as well
# as that of the covered work.
AUTOMAKE_OPTIONS = parallel-tests
AM_TESTS_ENVIRONMENT = MAKE_CHECK=True; export MAKE_CHECK;
TESTS = Test-auth-basic-fail.py \
Test-auth-basic.py \
Test-auth-both.py \
Test-auth-digest.py \
Test-auth-no-challenge.py \
Test-auth-no-challenge-url.py \
Test-auth-retcode.py \
Test-Content-disposition-2.py \
Test-Content-disposition.py
XFAIL_TESTS = Test-auth-both.py
LOG_COMPILER = python3

213
testenv/README Normal file
View File

@ -0,0 +1,213 @@
This document describes the working of the GNU Wget Test Suite.
Install Instructions:
================================================================================
This Test Suite exploits the Parallel Test Harness available in GNU Autotools.
Since it uses features from a relatively recent verion of Autotools, the minimum
required version as been bumped up to 1.11. Even with Automake v1.11, one MUST
run automake with the --add-missing switch to add the required driver files to
the repo. This is not required as of Automake v1.13.
Run the './configure' command to generate the Makefile and then run 'make check'
to execute the Test Suite.
File List:
================================================================================
* HTTPServer.py: This file contains a custom, programmatically configurable
HTTP Server for testing Wget. It runs an instance of Python's http.server
module.
* WgetTest.py: This file contains various functions and global variables for
each instance of the server that is initiated. It includes functions to
start and stop the server, to initialze the test environment and to cleanup
after a test.
* Test-Proto.py: This is a prototype Test Case file. The file defines all
the acceptable elements and their uses. Typically, one must copy this file
and edit it for writing Test Cases.
* ColourTerm.py: A custom library for printing coloured output to the
terminal. Currently it only supports 4 colours in a *nix environment.
Working:
================================================================================
The Test Files are valid Python scripts and the default mask for them is 755.
A singular Test must be invoked in the following manner:
$ ./python3 <Name of Test File> OR
$ ./<Name of Test File>
The script will then initialize the various elements and pass them to an object
of the respective Test Class. A directory with the name <Test name>-test will be
created and the PWD will be changed to this directory. The server is then
spawned with the required configuration elements. A blocking call to Wget is
made with the command line arguments specified in the Test Case along with the
list of URLs that it must download. The server is killed once Wget returns and
the following checks are used to determine the pass/fail status of the test:
* Return Code: The Exit code of Wget is matched against the expected Exit
Code as mentioned in the Test Case File.
* Downloaded Files: Check whether the expected downloaded files exist on
disk.
* File Content: Test whether the file contents were correctly downloaded by
Wget and not corrupted mid-way.
* Excess Files: Check to see whether any unexpected files were downloaded
by Wget.
Exit Codes:
===============================================================================
Following is a list of Exit Status Codes for the tests:
* 0 Test Successful
* 77 Test Skipped
* 99 Hard Error
* 100 Test Failed
Tests are skipped when they are either not supported by the platform, or Wget is
not compiled with support for that feature. This feature has not yet been
implemented.
Hard Errors occur when the Test File could not be found or there were errors
while parsing it.
All exceptions should ideally be handled gracefully. If you see any unhandled
exceptions, please file a bug report at <bug-wget@gnu.org>
Environment Variables:
================================================================================
* NO_CLEANUP: Do not remove the temporary files created by the test.
This will prevent the ${testname}-test directory from being deleted
File Structure:
================================================================================
The test case files are Python scripts. It is believed that Python is a simple
yet elegant language and should be easy for everyone to comprehend. This test
suite is written with the objective of making it easy to write new tests. The
structure has been kept as intuitive as possible and should not require much
effort to get accustomed to.
All Test Files MUST begin with the following Three Lines:
#!/usr/bin/python3
from sys import exit
from WgetTest import {HTTPTest|FTPTest}, WgetFile
It is recommended that a small description of the Test Case is provided next.
This would be very helpful to future contributors.
Next, is the const variable, TEST_NAME that defines the name of the Test.
Each File in the Test must be represented as a WgetFile object. The WgetFile
Class has the following prototype:
WgetFile (String name, String contents, String timestamp, dict rules)
None except name is a mandatory paramter, one may pass only those parameters
that are required by the File object.
The timestamp string should be a valid Unix Timestamp as defined in RFC xxxx.
The rules object is a dictionary element, with the key as the Rule Name and
value as the Rule Data. In most cases, the Rule Data is another dictionary.
Both, the HTTPTest and FTPTest modules have the same prototype:
{
name,
pre_hook,
test_options,
post_hook
}
name expects the string name, and is usually passed the TEST_NAME variable.
The remaining three hooks, expect python dictionary objects.
Valid File Rules:
================================================================================
This section lists the currently supported File Rules and their structure.
* Authentication: Used when a File must require Authorization for access.
The value for this key is the following dictionary:
|-->Type : Basic|Digest|Both|Both_inline
|-->User : <Username>
--->Pass : <Password>
* ExpectHeader : The following Headers MUST exist in every Request for the
File. The value for this key is a dictionary object where each header is
represented as:
|-->Header Name : <Header Data>
* RejectHeader : This list of Headers must NEVER occur in a request. It
uses the same value format as ExpectHeader.
* SendHeader : This list of Headers will be sent in EVERY response to a
request for the respective file. It follows the same value format as
ExpectHeader.
* Response : The HTTP Response Code to send to a request for this File.
The value is an Integer that represents a valid HTTP Response Code.
Pre Test Hooks:
================================================================================
The Pre-Test Hooks are executed just after starting the server and just before
spawning an instance of the server. These are usually used for setting up the
Test Environment and Server Rules. The currently supported Pre-Test Hooks are:
* ServerFiles : A list of WgetFile objects that must exist on the Server
* LocalFiles : A list of WgetFile objects that exist locally on disk
before Wget is executed.
Since pre_test is a dictionary, one may not assume that the hooks will be
executed in the same order as they are defined.
Test Options:
================================================================================
The test_options dictionary defines the commands to be used when the Test is
executed. The currently supported options are:
* Urls : A list of the filenames that Wget must attempt to
download. The complete URL will be created and passed to Wget automatically.
* WgetCommands : A string consisting of the various commandline switches
sent to Wget upon invokation. Any data placed between {{ }} in this string
will be replaced with the contents of self.<data> before being passed to
Wget. This is particularly useful for getting the hostname and port for a
file. While all Download URL's are passed to Urls, a notable exception is
when in-url authentication is used. In such a case, the URL is specified in
the WgetCommands string.
Post-Test Hooks:
================================================================================
These hooks are executed as soon as the call to Wget returns. The post-test
hooks are usually used to run checks on the data, files downloaded, return code,
etc. The following hooks are currently supported:
* ExpectedRetcode : This is an integer value of the ReturnCode with which
Wget is expected to exit.
* ExpectedFiles : This is a list of WgetFile objects of the files that
must exist locally on disk in the Test directory.
Writing New Tests:
================================================================================
See Test-Proto.py for an example of how to write Test Case files. The
recommended method for writing new Test Case files is to copy Test-Proto.py and
modify it to ones needs.
In case you require any functionality that is not currently defined in List of
Rules defined above, you should add the required code in WgetTest.py.
In most cases, one requires a new Rule to be added for the Server to follow.
In such a case, create a new Class in WgetTest.py with the same name as the Rule
and define an __init__ () function to handle the data. A method must also be
defined in HTTPTest / FTPTest modules to handle the said Rule.
Once a new Test File is created, it must be added to the TESTS variable in
Makefile.am. This way the Test will be executed on running a 'make check'.
If a Test is expected to fail on the current master branch, then the Test should
also be added to the XFAIL_TESTS variable. This will allow expected failures to
pass through. If a test mentioned in the XFAIL_TESTS variable passes, it gets
red-flagged as a XPASS.
Requirements:
================================================================================
1. Python >= 3.0
2. Automake >= 1.11

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget parses the Content-Disposition header
correctly and creates the appropriate file when the said filename exists.
"""
TEST_NAME = "Content Disposition Clobber"
############# File Definitions ###############################################
File1 = "Teapot"
File2 = "The Teapot Protocol"
File2_rules = {
"SendHeader" : {
"Content-Disposition" : "Attachment; filename=HTTP.Teapot"
}
}
A_File = WgetFile ("HTTP.Teapot", File1)
B_File = WgetFile ("File2", File2, rules=File2_rules)
WGET_OPTIONS = "-d --content-disposition"
WGET_URLS = ["File2"]
Files = [B_File]
Existing_Files = [A_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [WgetFile ("HTTP.Teapot.1", File2), A_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files,
"LocalFiles" : Existing_Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget parses the Content-Disposition header
correctly and creates a local file accordingly.
"""
TEST_NAME = "Content Disposition Header"
############# File Definitions ###############################################
File1 = """All that is gold does not glitter,
Not all those who wander are lost;
The old that is strong does not wither,
Deep roots are not reached by the frost.
From the ashes a fire shall be woken,
A light from the shadows shall spring;
Renewed shall be blade that was broken,
The crownless again shall be king."""
File1_rules = {
"SendHeader" : {
"Content-Disposition" : "Attachment; filename=JRR.Tolkein"
}
}
A_File = WgetFile ("LOTR", File1, rules=File1_rules)
WGET_OPTIONS = "-d --content-disposition"
WGET_URLS = ["LOTR"]
Files = [A_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [WgetFile ("JRR.Tolkein", File1)]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

44
testenv/Test-O.py Executable file
View File

@ -0,0 +1,44 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget correctly handles the -O command for output
filenames.
"""
TEST_NAME = "Output Filename Command"
############# File Definitions ###############################################
File1 = "Test Contents."
A_File = WgetFile ("File1", File1)
WGET_OPTIONS = "-d -O NewFile.txt"
WGET_URLS = ["File1"]
Files = [A_File]
ExistingFiles = [A_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [WgetFile ("NewFile.txt", File1)]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

70
testenv/Test-Proto.py Executable file
View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This is a Prototype Test File.
Ideally this File should be copied and edited to write new tests.
"""
TEST_NAME = "Prototype"
############# File Definitions ###############################################
File1 = "Would you like some Tea?"
File2 = "With lemon or cream?"
File3 = "Sure you're joking Mr. Feynman"
File1_rules = {
"Authentication" : {
"Type" : "Both",
"User" : "Sauron",
"Pass" : "TheEye"
},
"RejectHeader" : {
"Authorization" : "Basic U2F1cm9uOlRoZUV5ZQ=="
}
}
File2_rules = {
"Authentication" : {
"Type" : "Both_inline",
"User" : "Sauron",
"Pass" : "TheEye"
},
"SendHeader" : {
"Content-Disposition" : "Attachment; filename=newfile"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
B_File = WgetFile ("File2", File2, rules=File2_rules)
C_File = WgetFile ("File3", File3)
WGET_OPTIONS = "-d --content-disposition --user=Sauron --password=TheEye"
WGET_URLS = ["File1", "File2"]
Files = [A_File, B_File]
Existing_Files = [C_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, WgetFile ("newfile", File2), C_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files,
"LocalFiles" : Existing_Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

50
testenv/Test-auth-basic-fail.py Executable file
View File

@ -0,0 +1,50 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget returns the correct exit code when Basic
authentcation failes due to a username/password error.
"""
TEST_NAME = "Basic Authentication Failure"
############# File Definitions ###############################################
File1 = "I am an invisble man."
File1_rules = {
"Authentication" : {
"Type" : "Basic",
"User" : "Sauron",
"Pass" : "TheEye"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d --user=Sauron --password=Eye"
WGET_URLS = ["File1"]
Files = [A_File]
ExpectedReturnCode = 6
ExpectedDownloadedFiles = []
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

58
testenv/Test-auth-basic.py Executable file
View File

@ -0,0 +1,58 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures Wget's Basic Authorization Negotiation.
Also, we ensure that Wget saves the host after a successfull auth and
doesn't wait for a challenge the second time.
"""
TEST_NAME = "Basic Authorization"
############# File Definitions ###############################################
File1 = "I am an invisble man."
File2 = "I too am an invisible man."
File1_rules = {
"Authentication" : {
"Type" : "Basic",
"User" : "Sauron",
"Pass" : "TheEye"
}
}
File2_rules = {
"ExpectHeader" : {
"Authorization" : "Basic U2F1cm9uOlRoZUV5ZQ=="
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
B_File = WgetFile ("File2", File2, rules=File2_rules)
WGET_OPTIONS = "-d --user=Sauron --password=TheEye"
WGET_URLS = ["File1", "File2"]
Files = [A_File, B_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, B_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

76
testenv/Test-auth-both.py Executable file
View File

@ -0,0 +1,76 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures Wget's Basic Authorization Negotiation.
Also, we ensure that Wget saves the host after a successfull auth and
doesn't wait for a challenge the second time.
"""
TEST_NAME = "Multiple authentication support"
############# File Definitions ###############################################
File1 = "Would you like some Tea?"
File2 = "With lemon or cream?"
File3 = "Sure you're joking Mr. Feynman"
File1_rules = {
"Authentication" : {
"Type" : "Both",
"User" : "Sauron",
"Pass" : "TheEye"
},
"RejectHeader" : {
"Authorization" : "Basic U2F1cm9uOlRoZUV5ZQ=="
}
}
File2_rules = {
"Authentication" : {
"Type" : "Both_inline",
"User" : "Sauron",
"Pass" : "TheEye"
},
"RejectHeader" : {
"Authorization" : "Basic U2F1cm9uOlRoZUV5ZQ=="
}
}
File3_rules = {
"Authentication" : {
"Type" : "Digest",
"User" : "Sauron",
"Pass" : "TheEye"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
B_File = WgetFile ("File2", File2, rules=File2_rules)
C_File = WgetFile ("File3", File3, rules=File3_rules)
WGET_OPTIONS = "-d --user=Sauron --password=TheEye"
WGET_URLS = ["File1", "File2", "File3"]
Files = [A_File, B_File, C_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, B_File, C_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

49
testenv/Test-auth-digest.py Executable file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures Wget's Digest Authorization Negotiation.
"""
TEST_NAME = "Digest Authorization"
############# File Definitions ###############################################
File1 = "Need a cookie?"
File1_rules = {
"Authentication" : {
"Type" : "Digest",
"User" : "Pacman",
"Pass" : "Omnomnom"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d --user=Pacman --password=Omnomnom"
WGET_URLS = ["File1"]
Files = [A_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures Wget's Basic Authorization Negotiation, when credentials
are provided in-URL
"""
TEST_NAME = "Auth no challenge in URL"
############# File Definitions ###############################################
File1 = "Need a cookie?"
File1_rules = {
"Authentication" : {
"Type" : "Basic",
"User" : "Pacman",
"Pass" : "Omnomnom"
},
"ExpectHeader" : {
"Authorization" : "Basic UGFjbWFuOk9tbm9tbm9t"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d --auth-no-challenge http://Pacman:Omnomnom@localhost:{{port}}/File1"
WGET_URLS = []
Files = [A_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures Wget's Basic Authorization Negotiation, when the
--auth-no-challenge command is used.
"""
TEST_NAME = "Digest Authorization"
############# File Definitions ###############################################
File1 = "Need a cookie?"
File1_rules = {
"Authentication" : {
"Type" : "Basic",
"User" : "Pacman",
"Pass" : "Omnomnom"
},
"ExpectHeader" : {
"Authorization" : "Basic UGFjbWFuOk9tbm9tbm9t"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d --auth-no-challenge --user=Pacman --password=Omnomnom"
WGET_URLS = ["File1"]
Files = [A_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

49
testenv/Test-auth-retcode.py Executable file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget returns the correct return code when sent
a 403 Forbidden by the Server.
"""
TEST_NAME = "Forbidden Retcode"
############# File Definitions ###############################################
File1 = "Apples and Oranges? Really?"
File1_rules = {
"Response" : 403
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d"
WGET_URLS = ["File1"]
Files = [A_File]
ExpectedReturnCode = 8
ExpectedDownloadedFiles = []
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

279
testenv/WgetTest.py Normal file
View File

@ -0,0 +1,279 @@
import os
import shutil
import shlex
import sys
import traceback
import HTTPServer
import http.client
import re
from subprocess import call
from collections import defaultdict
from ColourTerm import printer
""" A Custom Exception raised by the Test Environment. """
class TestFailed (Exception):
def __init__ (self, error):
self.error = error
""" Class that defines methods common to both HTTP and FTP Tests. """
class CommonMethods:
TestFailed = TestFailed
def init_test_env (self, name):
testDir = name + "-test"
try:
os.mkdir (testDir)
except FileExistsError:
shutil.rmtree (testDir)
os.mkdir (testDir)
os.chdir (testDir)
self.tests_passed = True
def get_domain_addr (self, addr):
self.port = str (addr[1])
return addr[0] + ":" + str(addr[1]) + "/"
def exec_wget (self, options, urls, domain):
cmd_line = self.get_cmd_line (options, urls, domain)
params = shlex.split (cmd_line)
retcode = call (params)
return retcode
def get_cmd_line (self, options, urls, domain):
TEST_PATH = os.path.abspath (".")
WGET_PATH = os.path.join (TEST_PATH, "..", "..", "src", "wget")
WGET_PATH = os.path.abspath (WGET_PATH)
cmd_line = WGET_PATH + " " + options + " "
for url in urls:
cmd_line += domain + url + " "
print (cmd_line)
return cmd_line
def __test_cleanup (self):
testDir = self.name + "-test"
os.chdir ('..')
try:
if os.getenv ("NO_CLEANUP") is None:
shutil.rmtree (testDir)
except Exception as ae:
print ("Unknown Exception while trying to remove Test Environment.")
def _exit_test (self):
self.__test_cleanup ()
def begin (self):
return 0 if self.tests_passed else 100
""" Methods to check if the Test Case passes or not. """
def __gen_local_filesys (self):
file_sys = dict ()
for parent, dirs, files in os.walk ('.'):
for name in files:
onefile = dict ()
# Create the full path to file, removing the leading ./
# Might not work on non-unix systems. Someone please test.
filepath = os.path.join (parent, name)
file_handle = open (filepath, 'r')
file_content = file_handle.read ()
onefile['content'] = file_content
filepath = filepath[2:]
file_sys[filepath] = onefile
file_handle.close ()
return file_sys
def __check_downloaded_files (self, exp_filesys):
local_filesys = self.__gen_local_filesys ()
for files in exp_filesys:
if files.name in local_filesys:
local_file = local_filesys.pop (files.name)
if files.content != local_file ['content']:
raise TestFailed ("Contents of " + files.name + " do not match")
else:
raise TestFailed ("Expected file " + files.name + " not found")
if local_filesys:
print (local_filesys)
raise TestFailed ("Extra files downloaded.")
""" Test Rule Definitions """
""" This should really be taken out soon. All this extra stuff to ensure
re-use of old code is crap. Someone needs to re-write it. The new rework
branch is much better written, but integrating it requires effort.
All these classes should never exist. The whole server needs to modified.
"""
class Authentication:
def __init__ (self, auth_obj):
self.auth_type = auth_obj['Type']
self.auth_user = auth_obj['User']
self.auth_pass = auth_obj['Pass']
class ExpectHeader:
def __init__ (self, header_obj):
self.headers = header_obj
class RejectHeader:
def __init__ (self, header_obj):
self.headers = header_obj
class Response:
def __init__ (self, retcode):
self.response_code = retcode
class SendHeader:
def __init__ (self, header_obj):
self.headers = header_obj
def get_server_rules (self, file_obj):
""" The handling of expect header could be made much better when the
options are parsed in a true and better fashion. For an example,
see the commented portion in Test-basic-auth.py.
"""
server_rules = defaultdict (list)
for rule in file_obj.rules:
r_obj = getattr (self, rule) (file_obj.rules[rule])
server_rules[rule].append (r_obj)
return server_rules
""" Pre-Test Hook Function Calls """
def ServerFiles (self, server_files):
file_list = dict ()
server_rules = dict ()
for file_obj in server_files:
file_list[file_obj.name] = file_obj.content
rule_obj = self.get_server_rules (file_obj)
server_rules[file_obj.name] = rule_obj
self.server.server_conf (file_list, server_rules)
def LocalFiles (self, local_files):
for file_obj in local_files:
file_handler = open (file_obj.name, "w")
file_handler.write (file_obj.content)
file_handler.close ()
""" Test Option Function Calls """
def WgetCommands (self, command_list):
pattern = re.compile ('\{\{\w+\}\}')
match_obj = pattern.search (command_list)
if match_obj is not None:
rep = match_obj.group()
temp = getattr (self, rep.strip ('{}'))
command_list = command_list.replace (rep, temp)
self.options = command_list
def Urls (self, url_list):
self.urls = url_list
""" Post-Test Hook Function Calls """
def ExpectedRetcode (self, retcode):
if self.act_retcode != retcode:
pr = "Return codes do not match.\nExpected: " + str(retcode) + "\nActual: " + str(self.act_retcode)
raise TestFailed (pr)
def ExpectedFiles (self, exp_filesys):
self.__check_downloaded_files (exp_filesys)
""" Class for HTTP Tests. """
class HTTPTest (CommonMethods):
# Temp Notes: It is expected that when pre-hook functions are executed, only an empty test-dir exists.
# pre-hook functions are executed just prior to the call to Wget is made.
# post-hook functions will be executed immediately after the call to Wget returns.
def __init__ (
self,
name="Unnamed Test",
pre_hook=dict(),
test_params=dict(),
post_hook=dict()
):
try:
self.HTTP_setup (name, pre_hook, test_params, post_hook)
except TestFailed as tf:
printer ("RED", "Error: " + tf.error)
self.act_retcode = 100
self.tests_passed = False
except Exception as ae:
printer ("RED", "Unhandled Exception Caught.")
print ( ae.__str__ ())
traceback.print_exc ()
self.act_retcode = 100
else:
printer ("GREEN", "Test Passed")
finally:
self._exit_test ()
def HTTP_setup (self, name, pre_hook, test_params, post_hook):
self.name = name
printer ("BLUE", "Running Test " + self.name)
self.init_test_env (name)
self.server = self.init_HTTP_Server ()
self.domain = self.get_domain_addr (self.server.server_address)
for pre_hook_func in pre_hook:
try:
assert hasattr (self, pre_hook_func)
except AssertionError as ae:
self.stop_HTTP_Server (self.server)
raise TestFailed ("Pre Test Function " + pre_hook_func + " not defined.")
getattr (self, pre_hook_func) (pre_hook[pre_hook_func])
for test_func in test_params:
try:
assert hasattr (self, test_func)
except AssertionError as ae:
self.stop_HTTP_Server (self.server)
raise TestFailed ("Test Option " + test_func + " unknown.")
getattr (self, test_func) (test_params[test_func])
HTTPServer.spawn_server (self.server)
self.act_retcode = self.exec_wget (self.options, self.urls, self.domain)
self.stop_HTTP_Server ()
for post_hook_func in post_hook:
try:
assert hasattr (self, post_hook_func)
except AssertionError as ae:
raise TestFailed ("Post Test Function " + post_hook_func + " not defined.")
getattr (self, post_hook_func) (post_hook[post_hook_func])
def init_HTTP_Server (self):
server = HTTPServer.create_server ()
return server
#server = HTTPServer.HTTPd ()
#server.start (self)
#return server
def stop_HTTP_Server (self):
conn = http.client.HTTPConnection (self.domain.strip ('/'))
conn.request ("QUIT", "/")
self.fileSys = HTTPServer.ret_fileSys ()
conn.getresponse ()
#server.stop ()
""" WgetFile is a File Data Container object """
class WgetFile:
def __init__ (
self,
name,
content="Test Contents",
timestamp=None,
rules=dict()
):
self.name = name
self.content = content
self.timestamp = timestamp
self.rules = rules