Introducing Python based Test Environment

Squashed Commit, of the following commits:

7743384 Update documentation to reflect changes in code
b703633 Add feature that allows to ensure that Wget correctly crawls the website in recursive mode
0758f47 Add new test for recursive spider mode
43bb61b Smartly guess content type header
d4d0e63 Support substring replace in File Contents too
f578500 Compatibility fix with multiple servers
8b1a9b6 Extend Functionality to support spawning multiple servers
e84192a Use the provided calls to shutdown server instead of rewriting it
99659f3 Improve Documentation
cb94e52 Slight code cleanup. Remove unused code
886ac1a Shift to new Threading Model from Multiprocessing model
e74c2ec Add new test for POST Requests
48644f1 Print diff when file contents don't match
b6f9efe Add tests for Cookie support
4c9e6b4 Document pending work
e13bc90 Add new test to ensure Content Disposition and Auth work together
60d1f4d Add new Test for Continue command
738b299 Add test, Test-Head
9b9d16b Edit non-unique TEST_NAME variable
ae958db Minor optimizations to the way Server Rules are executed
50b4f0c The rules need not be a defaultdict.
dccc154 Introducing Python based Test Environment
This commit is contained in:
Darshit Shah 2014-07-24 16:25:44 +05:30
parent eab853b7e6
commit 7cbe8141d1
31 changed files with 2671 additions and 2 deletions

View File

@ -1,3 +1,8 @@
2013-08-31 Darshit Shah <darnir@gmail.com>
* configure.ac: Add testenv/Makefile to AC_CONFIG_FILES.
* Makefile.am: Add testenv to SUBDIRS
2014-07-22 Darshit Shah <darnir@gmail.com>
* configure.ac: Fix broken code for detecting libpsl

View File

@ -41,7 +41,7 @@ distuninstallcheck_listfiles = find . -type f | \
ACLOCAL_AMFLAGS = -I m4
# subdirectories in the distribution
SUBDIRS = lib src doc po tests util
SUBDIRS = lib src doc po tests util testenv
EXTRA_DIST = ChangeLog.README MAILING-LIST \
msdos/ChangeLog msdos/config.h msdos/Makefile.DJ \

View File

@ -579,7 +579,7 @@ dnl Create output
dnl
AC_CONFIG_FILES([Makefile src/Makefile doc/Makefile util/Makefile
po/Makefile.in tests/Makefile tests/WgetTest.pm
lib/Makefile])
lib/Makefile testenv/Makefile])
AC_CONFIG_HEADERS([src/config.h])
AC_OUTPUT

222
testenv/ChangeLog Normal file
View File

@ -0,0 +1,222 @@
2013-09-16 Darshit Shah <darnir@gmail.com>
* README: Update documentation
2013-09-14 Darshit Shah <darnir@gmail.com>
* HTTPServer.py (StoppableHTTPServer): Define object variable
request_headers which stores a list of requests received by the server
(StoppableHTTPServer.get_req_headers): Return the list of Request
Headers stored by the server
(_Handler.do_HEAD): Send the Request MEthod string for identification
(_Handler.do_GET): Same
(_Handler.__log_request): Log the request in Request_Headers list
(_Handler.send_head): Make a call to __log_request
* Test--spider-r.py: Add new list, Request_List, which contains all
the requests that Wget is expected to send. This will allow for
fine-grained tests on recursive downloading.
* WgetTest.py (CommonMethods.FilesCrawled): New Post-Test Hook, that
ensures that all the expected Files on the server were accessed as
expected.
(HTTPTest.stop_HTTP_server): On stopping server, asks it to respond
with list of all requests it received.
2013-09-13 Darshit Shah <darnir@gmail.com>
* Test--spider-r.py: Test retrieval in recursive spider mode.
* Makefile.am: add new file
2013-09-13 Darshit Shah <darnir@gmail.com>
* HTTPServer.py (_Handler.do_HEAD): If requested path is /, respond
with /index.html
(_Handler.do_HEAD): Smartly guess value of Content-Type Header from
file extension
(_Handler.guess_type): Use a preset list of extensions and
Content-Type strings. If the extension matches one in the list, use
that string, else default to "text/plain"
2013-09-13 Darshit Shah <darnir@gmail.com>
* WgetTest.py (CommonMethods._replace_substring): New method that will
replace a substring delimited by {{ }} characters by the value of
self.<substring> variable
(CommonMethods.WgetCommands): Use the _replace_substring () call to
replace the substrings in the the command line.
(CommonMethods.ServerFiles): Run the _replace_substring () method on
the File contents too.
2013-09-11 Darshit Shah <darnir@gmail.com>
* WgetTest.py (CommonMethods.exec_wget): Expect domain_list instead of
domain.
(CommonMethods.get_cmd_line): Same. Generate command line by
prepending to each file it's respective domain string
(CommonMethods.ServerFiles): Generate file_list and server_rules for
each Server and set the config details
(HTTPTest): New named parameter, servers which signifies number of
servers to spawn
(HTTPTest.HTTP_setup): This method now takes servers as a new
parameter. Instead of storing server and domain, we now store
server_list and domain_list. Each server must be initialized through a
loop.
(HTTPTest.stop_HTTP_server): Stop all servers in a loop.
* Test-Parallel-Proto.py: Prototype test file for multiple servers.
2013-09-10 Darshit Shah <darnir@gmail.com>
* WgetTest.py (HTTPTest.stop_HTTP_server): With the threaded servers,
we can simply use the socketserver.shutdown() method to close the
server instead of sending a QUIT command
* HTTPServer.py (StoppabelHTTPServer.serve_forever): Delete method. No
need to override this method anymore.
(WgetHTTPRequestHandler.do_QUIT): No longer required
(HTTPd): Rename self.server to self.server_inst to reduce ambiguity
when referenced from WgetTest
2013-09-08 Darshit Shah <darnir@gmail.com>
* README (File Structure): Add explanation about various variables
used consistently across all tests.
2013-09-07 Darshit Shah <darnir@gmail.com>
* HTTPServer.py: Remove bunch of old code artefacts
* WgetTest.py: Same
2013-09-07 Darshit Shah <darnir@gmail.com>
* HTTPServer.py (StoppableHTTPServer.server_conf): Change global
variable fileSys to an object variable. This is good programming
practice and required for parallel-wget support.
(StoppableHTTPServer.server_forever): Edit overridden method to remove
the global queue variable. No longer required under the new working
(WgetHTTPRequestHandler.do_QUIT): Don't push fileSys through the queue
(_Handler): Rename class __Handler to _Handler to match Python's
encapsulation rules
(_Handler.do_POST): fileSys is now an object variable of the server
(_Handler.do_PUT): Same
(_Handler.send_put): Same
(_Handler.send_head): Same
(HTTPd): New class that wraps around the server for Threading
(create_server): Make new object of HTTPd.
(spawn_server): Start the thread created through create_server
(ret_fileSys): Removed method. No longer required.
* WgetTest.py (HTTPTest.__init__): Don't explicitly set
self.act_retcode. Instead toggle tests_passed boolean to set the
correct return code.
(HTTPTest.HTTP_setup): We no longer call HTTPServer.spawn_server to
start a new instance of the server.
(HTTPTest.init_HTTP_server): We no longer call the old
create_server(), spawn_server() methods. Instead use the new HTTPd
class interface to create new instances of the server
(HTTPTest.stop_HTTP_server): Don't ask server to return fileSys.
2013-09-07 Darshit Shah <darnir@gmail.com>
* Test-Post.py: Test basic functionality for sending HTTP POST
requests using the --method command
* Makefile.am: Add new test
2013-09-06 Darshit Shah <darnir@gmail.com>
* WgetTest.py (CommonMethods.__check_downloaded_files): Print a
unified diff in case there is a mismatch in the file contents
2013-09-06 Darshit Shah <darnir@gmail.com>
* HTTPServer.py (WgetHTTPRequestHandler.test_cookies): Comment out the
old test_cookies code. This is no longer used and was causing problems
with expected cookies. The code will soon be removed anyways
* Test-cookie.py: Add new test for basic cookie functionality
* Test-cookie-401.py: Ensure cookies are saved during a 401 response
* Test-cookie-expires.py: Ensure that the Expires field is correctly
handled
* Test-cookies-domain-mismatch.py: Ensure that mismatched domains are
handled by Wget
* Makefile.am: Add the new tests
2013-09-06 Darshit Shah <darnir@gmail.com>
* README: New section on pending work. Will keep updating this to keep
track of work that remains to be done on this implementation
2013-09-05 Darshit Shah <darnir@gmail.com>
* Test-auth-with-content-disposition.py: Add test that ensures Content
Disposition works alongwith authentication
* Makefile.am: Add new test
2013-09-04 Darshit Shah <darnir@gmail.com>
* Test-c-full.py: Test Continue options
* Makefile.am: Add Test-c-full.py and Test-O
2013-09-02 Darshit Shah <darnir@gmail.com>
* Makefile.am: Add new Test
* Test-Head.py: New Test to ensure HEAD requests are handled correctly
2013-08-31 Darshit Shah <darnir@gmail.com>
* README: Explain that TEST_NAME needs to be unique
* Test-auth-no-challenge.py: Edit non-unique TEST_NAME
2013-08-31 Darshit Shah <darnir@gmail.com>
* HTTPTest.py (ServerError): Define new Exception for handling
internal control flow.
(StoppableHTTPServer.SendHeader): Simply pass. Do nothing. Adding
functionality here seems to crash for no apparent reason.
(stoppableHTTPServer.send_cust_headers): Minor optimization. No need
for extra variable.
(__Handler.Response): Handle explicit Response Code Rules
(__Handler.Authentication): Handle Authentication rules
(__Handler.handle_auth): Actual worker method for authentication
(__Handler.ExpectHeader): Ensure Expected Headers are received
(__Handler.RejectHeader): Ensure Blacklisted Headers are NOT received
(__Handler.send_HEAD): Dynamically call server rule functions based on
the self.rules list. This feature will later be added to POST/PUT, etc
2013-08-31 Darshit Shah <darnir@gmail.com>
* WgetTest.py: Remove import module defaultdict.
(CommonMethods.get_server_rules): server_rules should be a dict, not a
defaultdict (list).
* HTTPServer.py (WgetHTTPRequestHandler.get_rule_list): If rule does
not exist, return None. Not an emppty list.
(WgetHTTPRequestHandler.test_cookies): Rule variable is not a list
(__Handler.send_cust_headers): Same
(__Handler.custom_response): Same
(__Handler.is_authorized): Same
(__Handler.expect_headers): Same
(__Handler.reject_headers): Same
2013-08-31 Darshit Shah <darnir@gmail.com>
* README: (newfile) Simple help / instructions about using the Test
Environment.
* Makefile.am: (newfile) Makefile for the Test Environment. Uses the
Automake Parallel Test Harness
* WgetTest.py: (newfile) Base module that executes the Test.
* HTTPServer.py: (newfile) Contains the custom HTTP Server for the
Test Environment. Creates an instance of http.server in Python3.
* FTPServer.py: (newfile) Overrides methods from pyftpdlib for use in
the Test Environment. ** Work under progress **.
* ColourTerm.py: (newfile) A custom module to output coloured text to
the terminal. Known to work on POSIX shells.
* Test-Proto.py: (newfile) A prototype Test File. This should be
copied when writing a new Test Case.
* Test-Content-disposition-2.py: Test Content Disposition clobbering
* Test-Content-disposition.py: Test Content Disposition Headers
* Test-O.py: Test Output filename command
* Test-auth-basic-fail.py: Test returncode on auth failure
* Test-auth-basic.py: Test Basic Auth negotiation
* Test-auth-both.py: Test handling of Multiple auth providers. This
test currently fails.
* Test-auth-digest.py: Test Digest Auth Negotiation
* Test-auth-no-challenge-url.py: Ensure --auth-no-challenge is handled
when auth details are in-URL.
* Test-auth-no-challenge.py: Ensure --auth-no-challenge is honoured
* Test-auth-retcode.py: Ensure correct return code after 403 Forbidden
response.

23
testenv/ColourTerm.py Normal file
View File

@ -0,0 +1,23 @@
import platform
from os import getenv
T_COLORS = {
'PURPLE' : '\033[95m',
'BLUE' : '\033[94m',
'GREEN' : '\033[92m',
'YELLOW' : '\033[93m',
'RED' : '\033[91m',
'ENDC' : '\033[0m'
}
def printer (color, string):
if platform.system () == 'Linux':
if getenv ("MAKE_CHECK", "False") == "True":
print (string)
else:
print (T_COLORS.get (color) + string + T_COLORS.get ('ENDC'))
else:
print (string)
# vim: set ts=8 sw=3 tw=0 et :

162
testenv/FTPServer.py Normal file
View File

@ -0,0 +1,162 @@
import os
import re
import threading
import socket
import pyftpdlib.__main__
from pyftpdlib.ioloop import IOLoop
import pyftpdlib.handlers as Handle
from pyftpdlib.servers import FTPServer
from pyftpdlib.authorizers import DummyAuthorizer
from pyftpdlib._compat import PY3, u, b, getcwdu, callable
class FTPDHandler (Handle.FTPHandler):
def ftp_LIST (self, path):
try:
iterator = self.run_as_current_user(self.fs.get_list_dir, path)
except (OSError, FilesystemError):
err = sys.exc_info()[1]
why = _strerror (err)
self.respond ('550 %s. ' % why)
else:
if self.isRule ("Bad List") is True:
iter_list = list ()
for flist in iterator:
line = re.compile (r'(\s+)').split (flist.decode ('utf-8'))
line[8] = '0'
iter_l = ''.join (line).encode ('utf-8')
iter_list.append (iter_l)
iterator = (n for n in iter_list)
producer = Handle.BufferedIteratorProducer (iterator)
self.push_dtp_data (producer, isproducer=True, cmd="LIST")
return path
def ftp_PASV (self, line):
if self._epsvall:
self.respond ("501 PASV not allowed after EPSV ALL.")
return
self._make_epasv(extmode=False)
if self.isRule ("FailPASV") is True:
del self.server.global_rules["FailPASV"]
self.socket.close ()
def isRule (self, rule):
rule_obj = self.server.global_rules[rule]
return False if not rule_obj else rule_obj[0]
class FTPDServer (FTPServer):
def set_global_rules (self, rules):
self.global_rules = rules
class FTPd(threading.Thread):
"""A threaded FTP server used for running tests.
This is basically a modified version of the FTPServer class which
wraps the polling loop into a thread.
The instance returned can be used to start(), stop() and
eventually re-start() the server.
"""
handler = FTPDHandler
server_class = FTPDServer
def __init__(self, addr=None):
os.mkdir ('server')
os.chdir ('server')
try:
HOST = socket.gethostbyname ('localhost')
except socket.error:
HOST = 'localhost'
USER = 'user'
PASSWD = '12345'
HOME = getcwdu ()
threading.Thread.__init__(self)
self.__serving = False
self.__stopped = False
self.__lock = threading.Lock()
self.__flag = threading.Event()
if addr is None:
addr = (HOST, 0)
authorizer = DummyAuthorizer()
authorizer.add_user(USER, PASSWD, HOME, perm='elradfmwM') # full perms
authorizer.add_anonymous(HOME)
self.handler.authorizer = authorizer
# lowering buffer sizes = more cycles to transfer data
# = less false positive test failures
self.handler.dtp_handler.ac_in_buffer_size = 32768
self.handler.dtp_handler.ac_out_buffer_size = 32768
self.server = self.server_class(addr, self.handler)
self.host, self.port = self.server.socket.getsockname()[:2]
os.chdir ('..')
def set_global_rules (self, rules):
self.server.set_global_rules (rules)
def __repr__(self):
status = [self.__class__.__module__ + "." + self.__class__.__name__]
if self.__serving:
status.append('active')
else:
status.append('inactive')
status.append('%s:%s' % self.server.socket.getsockname()[:2])
return '<%s at %#x>' % (' '.join(status), id(self))
@property
def running(self):
return self.__serving
def start(self, timeout=0.001):
"""Start serving until an explicit stop() request.
Polls for shutdown every 'timeout' seconds.
"""
if self.__serving:
raise RuntimeError("Server already started")
if self.__stopped:
# ensure the server can be started again
FTPd.__init__(self, self.server.socket.getsockname(), self.handler)
self.__timeout = timeout
threading.Thread.start(self)
self.__flag.wait()
def run(self):
self.__serving = True
self.__flag.set()
while self.__serving:
self.__lock.acquire()
self.server.serve_forever(timeout=self.__timeout, blocking=False)
self.__lock.release()
self.server.close_all()
def stop(self):
"""Stop serving (also disconnecting all currently connected
clients) by telling the serve_forever() loop to stop and
waits until it does.
"""
if not self.__serving:
raise RuntimeError("Server not started yet")
self.__serving = False
self.__stopped = True
self.join()
def mk_file_sys (file_list):
os.chdir ('server')
for name, content in file_list.items ():
file_h = open (name, 'w')
file_h.write (content)
file_h.close ()
os.chdir ('..')
def filesys ():
fileSys = dict ()
os.chdir ('server')
for parent, dirs, files in os.walk ('.'):
for filename in files:
file_handle = open (filename, 'r')
file_content = file_handle.read ()
fileSys[filename] = file_content
os.chdir ('..')
return fileSys

430
testenv/HTTPServer.py Normal file
View File

@ -0,0 +1,430 @@
from http.server import HTTPServer, BaseHTTPRequestHandler
from posixpath import basename, splitext
from base64 import b64encode
from random import random
from hashlib import md5
import threading
import re
class InvalidRangeHeader (Exception):
""" Create an Exception for handling of invalid Range Headers. """
# TODO: Eliminate this exception and use only ServerError
def __init__ (self, err_message):
self.err_message = err_message
class ServerError (Exception):
def __init__ (self, err_message):
self.err_message = err_message
class StoppableHTTPServer (HTTPServer):
request_headers = list ()
""" Define methods for configuring the Server. """
def server_conf (self, filelist, conf_dict):
""" Set Server Rules and File System for this instance. """
self.server_configs = conf_dict
self.fileSys = filelist
def get_req_headers (self):
return self.request_headers
class WgetHTTPRequestHandler (BaseHTTPRequestHandler):
""" Define methods for handling Test Checks. """
def get_rule_list (self, name):
r_list = self.rules.get (name) if name in self.rules else None
return r_list
class _Handler (WgetHTTPRequestHandler):
""" Define Handler Methods for different Requests. """
InvalidRangeHeader = InvalidRangeHeader
protocol_version = 'HTTP/1.1'
""" Define functions for various HTTP Requests. """
def do_HEAD (self):
self.send_head ("HEAD")
def do_GET (self):
content, start = self.send_head ("GET")
if content:
if start is None:
self.wfile.write (content.encode ('utf-8'))
else:
self.wfile.write (content.encode ('utf-8')[start:])
def do_POST (self):
path = self.path[1:]
self.rules = self.server.server_configs.get (path)
if not self.custom_response ():
return (None, None)
if path in self.server.fileSys:
body_data = self.get_body_data ()
self.send_response (200)
self.send_header ("Content-type", "text/plain")
content = self.server.fileSys.pop (path) + "\n" + body_data
total_length = len (content)
self.server.fileSys[path] = content
self.send_header ("Content-Length", total_length)
self.finish_headers ()
try:
self.wfile.write (content.encode ('utf-8'))
except Exception:
pass
else:
self.send_put (path)
def do_PUT (self):
path = self.path[1:]
self.rules = self.server.server_configs.get (path)
if not self.custom_response ():
return (None, None)
self.server.fileSys.pop (path, None)
self.send_put (path)
""" End of HTTP Request Method Handlers. """
""" Helper functions for the Handlers. """
def parse_range_header (self, header_line, length):
if header_line is None:
return None
if not header_line.startswith ("bytes="):
raise InvalidRangeHeader ("Cannot parse header Range: %s" %
(header_line))
regex = re.match (r"^bytes=(\d*)\-$", header_line)
range_start = int (regex.group (1))
if range_start >= length:
raise InvalidRangeHeader ("Range Overflow")
return range_start
def get_body_data (self):
cLength_header = self.headers.get ("Content-Length")
cLength = int (cLength_header) if cLength_header is not None else 0
body_data = self.rfile.read (cLength).decode ('utf-8')
return body_data
def send_put (self, path):
body_data = self.get_body_data ()
self.send_response (201)
self.server.fileSys[path] = body_data
self.send_header ("Content-type", "text/plain")
self.send_header ("Content-Length", len (body_data))
self.finish_headers ()
try:
self.wfile.write (body_data.encode ('utf-8'))
except Exception:
pass
def SendHeader (self, header_obj):
pass
# headers_list = header_obj.headers
# for header_line in headers_list:
# print (header_line + " : " + headers_list[header_line])
# self.send_header (header_line, headers_list[header_line])
def send_cust_headers (self):
header_obj = self.get_rule_list ('SendHeader')
if header_obj:
for header in header_obj.headers:
self.send_header (header, header_obj.headers[header])
def finish_headers (self):
self.send_cust_headers ()
self.end_headers ()
def Response (self, resp_obj):
self.send_response (resp_obj.response_code)
self.finish_headers ()
raise ServerError ("Custom Response code sent.")
def custom_response (self):
codes = self.get_rule_list ('Response')
if codes:
self.send_response (codes.response_code)
self.finish_headers ()
return False
else:
return True
def base64 (self, data):
string = b64encode (data.encode ('utf-8'))
return string.decode ('utf-8')
def send_challenge (self, auth_type):
if auth_type == "Both":
self.send_challenge ("Digest")
self.send_challenge ("Basic")
return
if auth_type == "Basic":
challenge_str = 'Basic realm="Wget-Test"'
elif auth_type == "Digest" or auth_type == "Both_inline":
self.nonce = md5 (str (random ()).encode ('utf-8')).hexdigest ()
self.opaque = md5 (str (random ()).encode ('utf-8')).hexdigest ()
challenge_str = 'Digest realm="Test", nonce="%s", opaque="%s"' %(
self.nonce,
self.opaque)
challenge_str += ', qop="auth"'
if auth_type == "Both_inline":
challenge_str = 'Basic realm="Wget-Test", ' + challenge_str
self.send_header ("WWW-Authenticate", challenge_str)
def authorize_Basic (self, auth_header, auth_rule):
if auth_header is None or auth_header.split(' ')[0] != 'Basic':
return False
else:
self.user = auth_rule.auth_user
self.passw = auth_rule.auth_pass
auth_str = "Basic " + self.base64 (self.user + ":" + self.passw)
return True if auth_str == auth_header else False
def parse_auth_header (self, auth_header):
n = len("Digest ")
auth_header = auth_header[n:].strip()
items = auth_header.split(", ")
key_values = [i.split("=", 1) for i in items]
key_values = [(k.strip(), v.strip().replace('"', '')) for k, v in key_values]
return dict(key_values)
def KD (self, secret, data):
return self.H (secret + ":" + data)
def H (self, data):
return md5 (data.encode ('utf-8')).hexdigest ()
def A1 (self):
return "%s:%s:%s" % (self.user, "Test", self.passw)
def A2 (self, params):
return "%s:%s" % (self.command, params["uri"])
def check_response (self, params):
if "qop" in params:
data_str = params['nonce'] \
+ ":" + params['nc'] \
+ ":" + params['cnonce'] \
+ ":" + params['qop'] \
+ ":" + self.H (self.A2 (params))
else:
data_str = params['nonce'] + ":" + self.H (self.A2 (params))
resp = self.KD (self.H (self.A1 ()), data_str)
return True if resp == params['response'] else False
def authorize_Digest (self, auth_header, auth_rule):
if auth_header is None or auth_header.split(' ')[0] != 'Digest':
return False
else:
self.user = auth_rule.auth_user
self.passw = auth_rule.auth_pass
params = self.parse_auth_header (auth_header)
pass_auth = True
if self.user != params['username'] or \
self.nonce != params['nonce'] or self.opaque != params['opaque']:
pass_auth = False
req_attribs = ['username', 'realm', 'nonce', 'uri', 'response']
for attrib in req_attribs:
if not attrib in params:
pass_auth = False
if not self.check_response (params):
pass_auth = False
return pass_auth
def authorize_Both (self, auth_header, auth_rule):
return False
def authorize_Both_inline (self, auth_header, auth_rule):
return False
def Authentication (self, auth_rule):
try:
self.handle_auth (auth_rule)
except ServerError as se:
self.send_response (401, "Authorization Required")
self.send_challenge (auth_rule.auth_type)
self.finish_headers ()
raise ServerError (se.__str__())
def handle_auth (self, auth_rule):
is_auth = True
auth_header = self.headers.get ("Authorization")
required_auth = auth_rule.auth_type
if required_auth == "Both" or required_auth == "Both_inline":
auth_type = auth_header.split(' ')[0] if auth_header else required_auth
else:
auth_type = required_auth
assert hasattr (self, "authorize_" + auth_type)
is_auth = getattr (self, "authorize_" + auth_type) (auth_header, auth_rule)
if is_auth is False:
raise ServerError ("Unable to Authenticate")
def is_authorized (self):
is_auth = True
auth_rule = self.get_rule_list ('Authentication')
if auth_rule:
auth_header = self.headers.get ("Authorization")
req_auth = auth_rule.auth_type
if req_auth == "Both" or req_auth == "Both_inline":
auth_type = auth_header.split(' ')[0] if auth_header else req_auth
else:
auth_type = req_auth
assert hasattr (self, "authorize_" + auth_type)
is_auth = getattr (self, "authorize_" + auth_type) (auth_header, auth_rule)
if is_auth is False:
self.send_response (401)
self.send_challenge (auth_type)
self.finish_headers ()
return is_auth
def ExpectHeader (self, header_obj):
exp_headers = header_obj.headers
for header_line in exp_headers:
header_recd = self.headers.get (header_line)
if header_recd is None or header_recd != exp_headers[header_line]:
self.send_error (400, "Expected Header " + header_line + " not found")
self.finish_headers ()
raise ServerError ("Header " + header_line + " not found")
def expect_headers (self):
""" This is modified code to handle a few changes. Should be removed ASAP """
exp_headers_obj = self.get_rule_list ('ExpectHeader')
if exp_headers_obj:
exp_headers = exp_headers_obj.headers
for header_line in exp_headers:
header_re = self.headers.get (header_line)
if header_re is None or header_re != exp_headers[header_line]:
self.send_error (400, 'Expected Header not Found')
self.end_headers ()
return False
return True
def RejectHeader (self, header_obj):
rej_headers = header_obj.headers
for header_line in rej_headers:
header_recd = self.headers.get (header_line)
if header_recd is not None and header_recd == rej_headers[header_line]:
self.send_error (400, 'Blackisted Header ' + header_line + ' received')
self.finish_headers ()
raise ServerError ("Header " + header_line + ' received')
def reject_headers (self):
rej_headers = self.get_rule_list ("RejectHeader")
if rej_headers:
rej_headers = rej_headers.headers
for header_line in rej_headers:
header_re = self.headers.get (header_line)
if header_re is not None and header_re == rej_headers[header_line]:
self.send_error (400, 'Blacklisted Header was Sent')
self.end_headers ()
return False
return True
def __log_request (self, method):
req = method + " " + self.path
self.server.request_headers.append (req)
def send_head (self, method):
""" Common code for GET and HEAD Commands.
This method is overriden to use the fileSys dict.
The method variable contains whether this was a HEAD or a GET Request.
According to RFC 2616, the server should not differentiate between
the two requests, however, we use it here for a specific test.
"""
if self.path == "/":
path = "index.html"
else:
path = self.path[1:]
self.__log_request (method)
if path in self.server.fileSys:
self.rules = self.server.server_configs.get (path)
for rule_name in self.rules:
try:
assert hasattr (self, rule_name)
getattr (self, rule_name) (self.rules [rule_name])
except AssertionError as ae:
msg = "Method " + rule_name + " not defined"
self.send_error (500, msg)
return (None, None)
except ServerError as se:
print (se.__str__())
return (None, None)
content = self.server.fileSys.get (path)
content_length = len (content)
try:
self.range_begin = self.parse_range_header (
self.headers.get ("Range"), content_length)
except InvalidRangeHeader as ae:
# self.log_error("%s", ae.err_message)
if ae.err_message == "Range Overflow":
self.send_response (416)
self.finish_headers ()
return (None, None)
else:
self.range_begin = None
if self.range_begin is None:
self.send_response (200)
else:
self.send_response (206)
self.send_header ("Accept-Ranges", "bytes")
self.send_header ("Content-Range",
"bytes %d-%d/%d" % (self.range_begin,
content_length - 1,
content_length))
content_length -= self.range_begin
cont_type = self.guess_type (path)
self.send_header ("Content-type", cont_type)
self.send_header ("Content-Length", content_length)
self.finish_headers ()
return (content, self.range_begin)
else:
self.send_error (404, "Not Found")
return (None, None)
def guess_type (self, path):
base_name = basename ("/" + path)
name, ext = splitext (base_name)
extension_map = {
".txt" : "text/plain",
".css" : "text/css",
".html" : "text/html"
}
if ext in extension_map:
return extension_map[ext]
else:
return "text/plain"
class HTTPd (threading.Thread):
server_class = StoppableHTTPServer
handler = _Handler
def __init__ (self, addr=None):
threading.Thread.__init__ (self)
if addr is None:
addr = ('localhost', 0)
self.server_inst = self.server_class (addr, self.handler)
self.server_address = self.server_inst.socket.getsockname()[:2]
def run (self):
self.server_inst.serve_forever ()
def server_conf (self, file_list, server_rules):
self.server_inst.server_conf (file_list, server_rules)
# vim: set ts=8 sts=4 sw=3 tw=0 et :

53
testenv/Makefile.am Normal file
View File

@ -0,0 +1,53 @@
# Makefile for `wget' utility
# Copyright (C) 2013 Free Software Foundation, Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Wget. If not, see <http://www.gnu.org/licenses/>.
# Additional permission under GNU GPL version 3 section 7
# If you modify this program, or any covered work, by linking or
# combining it with the OpenSSL project's OpenSSL library (or a
# modified version of that library), containing parts covered by the
# terms of the OpenSSL or SSLeay licenses, the Free Software Foundation
# grants you additional permission to convey the resulting work.
# Corresponding Source for a non-source form of such a combination
# shall include the source code for the parts of OpenSSL used as well
# as that of the covered work.
AUTOMAKE_OPTIONS = parallel-tests
AM_TESTS_ENVIRONMENT = MAKE_CHECK=True; export MAKE_CHECK;
TESTS = Test-auth-basic-fail.py \
Test-auth-basic.py \
Test-auth-both.py \
Test-auth-digest.py \
Test-auth-no-challenge.py \
Test-auth-no-challenge-url.py \
Test-auth-retcode.py \
Test-auth-with-content-disposition.py \
Test-c-full.py \
Test-Content-disposition-2.py \
Test-Content-disposition.py \
Test-cookie-401.py \
Test-cookie-domain-mismatch.py \
Test-cookie-expires.py \
Test-cookie.py \
Test-Head.py \
Test-O.py \
Test-Post.py \
Test--spider-r.py
XFAIL_TESTS = Test-auth-both.py \
Test--spider-r.py
LOG_COMPILER = python3

262
testenv/README Normal file
View File

@ -0,0 +1,262 @@
This document describes the working of the GNU Wget Test Suite.
Install Instructions:
================================================================================
This Test Suite exploits the Parallel Test Harness available in GNU Autotools.
Since it uses features from a relatively recent verion of Autotools, the minimum
required version as been bumped up to 1.11.
Run the './configure' command to generate the Makefile and then run 'make check'
to execute the Test Suite. Use the '-j n' option with 'make check' to execute
n tests simultaneously.
File List:
================================================================================
* HTTPServer.py: This file contains a custom, programmatically configurable
HTTP Server for testing Wget. It runs an instance of Python's http.server
module.
* WgetTest.py: This file contains various functions and global variables for
each instance of the server that is initiated. It includes functions to
start and stop the server, to initialze the test environment and to cleanup
after a test.
* Test-Proto.py: This is a prototype Test Case file. The file defines all
the acceptable elements and their uses. Typically, one must copy this file
and edit it for writing Test Cases.
* ColourTerm.py: A custom library for printing coloured output to the
terminal. Currently it only supports 4 colours in a *nix environment.
Working:
================================================================================
The Test Files are valid Python scripts and the default mask for them is 755.
A singular Test must be invoked in the following manner:
$ ./python3 <Name of Test File> OR
$ ./<Name of Test File>
The script will then initialize the various elements and pass them to an object
of the respective Test Class. A directory with the name <Test name>-test will be
created and the PWD will be changed to this directory. The server is then
spawned with the required configuration elements. A blocking call to Wget is
made with the command line arguments specified in the Test Case along with the
list of URLs that it must download. The server is killed once Wget returns and
the following checks are used to determine the pass/fail status of the test:
* Return Code: The Exit code of Wget is matched against the expected Exit
Code as mentioned in the Test Case File.
* Downloaded Files: Check whether the expected downloaded files exist on
disk.
* File Content: Test whether the file contents were correctly downloaded by
Wget and not corrupted mid-way.
* Excess Files: Check to see whether any unexpected files were downloaded
by Wget.
Exit Codes:
===============================================================================
Following is a list of Exit Status Codes for the tests:
* 0 Test Successful
* 66 Errors/Warnings Reported by Thread Sanitizer (If built with -fsanitize)
* 77 Test Skipped
* 99 Hard Error
* 100 Test Failed
Tests are skipped when they are either not supported by the platform, or Wget
is not compiled with support for that feature. This feature has not yet been
implemented.
Hard Errors occur when there are problems with the Environment code. Hard
Error reporting is currently not enabled and all errors are reported as
failures.
All exceptions should ideally be handled gracefully. If you see any unhandled
exceptions, please file a bug report at <bug-wget@gnu.org>
Environment Variables:
================================================================================
TODO: Set variables for ONLY_SERVER and WGET_PATH too.
* NO_CLEANUP: Do not remove the temporary files created by the test.
This will prevent the ${testname}-test directory from being deleted
File Structure:
================================================================================
The test case files are Python scripts. It is believed that Python is a simple
yet elegant language and should be easy for everyone to comprehend. This test
suite is written with the objective of making it easy to write new tests. The
structure has been kept as intuitive as possible and should not require much
effort to get accustomed to.
All Test Files MUST begin with the following Three Lines:
#!/usr/bin/python3
from sys import exit
from WgetTest import {HTTPTest|FTPTest}, WgetFile
It is recommended that a small description of the Test Case is provided next.
This would be very helpful to future contributors.
Next, is the const variable, TEST_NAME that defines the name of the Test.
Each File in the Test must be represented as a WgetFile object. The WgetFile
Class has the following prototype:
WgetFile (String name, String contents, String timestamp, dict rules)
None except name is a mandatory paramter, one may pass only those parameters
that are required by the File object.
The timestamp string should be a valid Unix Timestamp as defined in RFC xxxx.
The rules object is a dictionary element, with the key as the Rule Name and
value as the Rule Data. In most cases, the Rule Data is another dictionary.
Various variables used consistently across all tests are:
* WGET_OPTIONS: The command line string passed to Wget upon invokation. This
string may contain URLs, like in the case where in-URL authentication is
used. Variable names passed like {{var_name}} will be replaced by the
contents of the variable self.var_name before being passed to Wget
* WGET_URLS: This is a list of filenames which will be appended as the URLs
to Wget during invokation. This is a list of lists, where WGET_URLS[0]
represents the list of Filenames called from Server[0], WGET_URLS[1] is a
list of files downloaded from Server[2], etc.
* Files: This variable defines the files that exist in the Server's
filesystem. The Files variable is a list of lists of WgetFile objects.
This means that File[0] is a list of WgetFile objects that lie on Server[0],
File[1] a list of files on Server[1] and so on.
* Existing_Files: This is a list of files that already exist in the
directory from which Wget is invoked.
* ExpectedReturnCode: The Exit Code expected to be returned by Wget after
the test.
* ExpectedDownloadedFiles: A list of files that are expected in the local
directory after Wget has finished executing. This does not include the files
already existing before Wget was launched and must be mentioned again.
* Request_List: An unordered list of Requests that each server must receive.
This too is a list of lists and follows the same convention as others above.
Both, the HTTPTest and FTPTest modules have the same prototype:
{
name,
pre_hook,
test_options,
post_hook,
servers
}
name expects the string name, and is usually passed the TEST_NAME variable,
the three hooks, expect python dictionary objects and servers is an integer.
Valid File Rules:
================================================================================
This section lists the currently supported File Rules and their structure.
* Authentication: Used when a File must require Authorization for access.
The value for this key is the following dictionary:
|-->Type : Basic|Digest|Both|Both_inline
|-->User : <Username>
--->Pass : <Password>
* ExpectHeader : The following Headers MUST exist in every Request for the
File. The value for this key is a dictionary object where each header is
represented as:
|-->Header Name : <Header Data>
* RejectHeader : This list of Headers must NEVER occur in a request. It
uses the same value format as ExpectHeader.
* SendHeader : This list of Headers will be sent in EVERY response to a
request for the respective file. It follows the same value format as
ExpectHeader.
* Response : The HTTP Response Code to send to a request for this File.
The value is an Integer that represents a valid HTTP Response Code.
Pre Test Hooks:
================================================================================
The Pre-Test Hooks are executed just after starting the server and just before
spawning an instance of the server. These are usually used for setting up the
Test Environment and Server Rules. The currently supported Pre-Test Hooks are:
* ServerFiles : A list of WgetFile objects that must exist on the Server
* LocalFiles : A list of WgetFile objects that exist locally on disk
before Wget is executed.
Since pre_test is a dictionary, one may not assume that the hooks will be
executed in the same order as they are defined.
Test Options:
================================================================================
The test_options dictionary defines the commands to be used when the Test is
executed. The currently supported options are:
* Urls : A list of the filenames that Wget must attempt to
download. The complete URL will be created and passed to Wget automatically.
* WgetCommands : A string consisting of the various commandline switches
sent to Wget upon invokation. Any data placed between {{ }} in this string
will be replaced with the contents of self.<data> before being passed to
Wget. This is particularly useful for getting the hostname and port for a
file. While all Download URL's are passed to Urls, a notable exception is
when in-url authentication is used. In such a case, the URL is specified in
the WgetCommands string.
Post-Test Hooks:
================================================================================
These hooks are executed as soon as the call to Wget returns. The post-test
hooks are usually used to run checks on the data, files downloaded, return code,
etc. The following hooks are currently supported:
* ExpectedRetcode : This is an integer value of the ReturnCode with which
Wget is expected to exit.
* ExpectedFiles : This is a list of WgetFile objects of the files that
must exist locally on disk in the Test directory.
* FilesCrawled : This requires a list of the Requests that the server is
expected to receive. The order is un-important since it will vary on the
parallel-wget branch. This hook is used in tests for Recursive mode to
ensure that the website is traversed correctly.
Writing New Tests:
================================================================================
See Test-Proto.py for an example of how to write Test Case files. The
recommended method for writing new Test Case files is to copy Test-Proto.py and
modify it to ones needs.
In case you require any functionality that is not currently defined in List of
Rules defined above, you should add the required code in WgetTest.py.
In most cases, one requires a new Rule to be added for the Server to follow.
In such a case, create a new Class in WgetTest.py with the same name as the Rule
and define an __init__ () function to handle the data. A method must also be
defined in HTTPTest / FTPTest modules to handle the said Rule.
Once a new Test File is created, it must be added to the TESTS variable in
Makefile.am. This way the Test will be executed on running a 'make check'.
If a Test is expected to fail on the current master branch, then the Test should
also be added to the XFAIL_TESTS variable. This will allow expected failures to
pass through. If a test mentioned in the XFAIL_TESTS variable passes, it gets
red-flagged as a XPASS.
Remember to always name the Test correctly using the TEST_NAME variable. This
is essential since a directory with the Test Name is created and this can
cause synchronization problems when the Parallel Test Harness is used.
One can use the following command on Unix systems to check for TEST_NAME
clashes:
$ grep -r -h "TEST_NAME =" | cut -c13- | uniq -c -d
Work Remaining:
================================================================================
Some amount of work still remains to be done.
* Errors in server-side checks need to be handled more explicitly
* Support parallel-wget branch
* Support to spawn multiple servers is already in place. Need to handle
multiple requests to a server simultaneously. Use THreading MixIn.
* SSL Tests. Use xyne's HTTPS server implemention
* Complete support for FTP Tests
* IRI Support. This shouldn't require much effort
Requirements:
================================================================================
1. Python >= 3.0
2. Automake >= 1.11

105
testenv/Test--spider-r.py Executable file
View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test executed Wget in Spider mode with recursive retrieval.
"""
TEST_NAME = "Recursive Spider"
############# File Definitions ###############################################
mainpage = """
<html>
<head>
<title>Main Page</title>
</head>
<body>
<p>
Some text and a link to a <a href="http://127.0.0.1:{{port}}/secondpage.html">second page</a>.
Also, a <a href="http://127.0.0.1:{{port}}/nonexistent">broken link</a>.
</p>
</body>
</html>
"""
secondpage = """
<html>
<head>
<title>Second Page</title>
</head>
<body>
<p>
Some text and a link to a <a href="http://127.0.0.1:{{port}}/thirdpage.html">third page</a>.
Also, a <a href="http://127.0.0.1:{{port}}/nonexistent">broken link</a>.
</p>
</body>
</html>
"""
thirdpage = """
<html>
<head>
<title>Third Page</title>
</head>
<body>
<p>
Some text and a link to a <a href="http://127.0.0.1:{{port}}/dummy.txt">text file</a>.
Also, another <a href="http://127.0.0.1:{{port}}/againnonexistent">broken link</a>.
</p>
</body>
</html>
"""
dummyfile = "Don't care."
index_html = WgetFile ("index.html", mainpage)
secondpage_html = WgetFile ("secondpage.html", secondpage)
thirdpage_html = WgetFile ("thirdpage.html", thirdpage)
dummy_txt = WgetFile ("dummy.txt", dummyfile)
Request_List = [
[
"HEAD /",
"GET /",
"GET /robots.txt",
"HEAD /secondpage.html",
"GET /secondpage.html",
"HEAD /nonexistent",
"HEAD /thirdpage.html",
"GET /thirdpage.html",
"HEAD /dummy.txt",
"HEAD /againnonexistent"
]
]
WGET_OPTIONS = "-d --spider -r"
WGET_URLS = [[""]]
Files = [[index_html, secondpage_html, thirdpage_html, dummy_txt]]
ExpectedReturnCode = 8
ExpectedDownloadedFiles = []
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode,
"FilesCrawled" : Request_List
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget parses the Content-Disposition header
correctly and creates the appropriate file when the said filename exists.
"""
TEST_NAME = "Content Disposition Clobber"
############# File Definitions ###############################################
File1 = "Teapot"
File2 = "The Teapot Protocol"
File2_rules = {
"SendHeader" : {
"Content-Disposition" : "Attachment; filename=HTTP.Teapot"
}
}
A_File = WgetFile ("HTTP.Teapot", File1)
B_File = WgetFile ("File2", File2, rules=File2_rules)
WGET_OPTIONS = "-d --content-disposition"
WGET_URLS = [["File2"]]
Files = [[B_File]]
Existing_Files = [A_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [WgetFile ("HTTP.Teapot.1", File2), A_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files,
"LocalFiles" : Existing_Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget parses the Content-Disposition header
correctly and creates a local file accordingly.
"""
TEST_NAME = "Content Disposition Header"
############# File Definitions ###############################################
File1 = """All that is gold does not glitter,
Not all those who wander are lost;
The old that is strong does not wither,
Deep roots are not reached by the frost.
From the ashes a fire shall be woken,
A light from the shadows shall spring;
Renewed shall be blade that was broken,
The crownless again shall be king."""
File1_rules = {
"SendHeader" : {
"Content-Disposition" : "Attachment; filename=JRR.Tolkein"
}
}
A_File = WgetFile ("LOTR", File1, rules=File1_rules)
WGET_OPTIONS = "-d --content-disposition"
WGET_URLS = [["LOTR"]]
Files = [[A_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [WgetFile ("JRR.Tolkein", File1)]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

43
testenv/Test-Head.py Executable file
View File

@ -0,0 +1,43 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget correctly handles responses to HEAD requests
and does not actually download any data
"""
TEST_NAME = "HEAD Requests"
############# File Definitions ###############################################
File1 = "You shall not pass!"
A_File = WgetFile ("File1", File1)
WGET_OPTIONS = "-d --method=HEAD"
WGET_URLS = [["File1"]]
Files = [[A_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = []
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files,
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

44
testenv/Test-O.py Executable file
View File

@ -0,0 +1,44 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget correctly handles the -O command for output
filenames.
"""
TEST_NAME = "Output Filename Command"
############# File Definitions ###############################################
File1 = "Test Contents."
A_File = WgetFile ("File1", File1)
WGET_OPTIONS = "-d -O NewFile.txt"
WGET_URLS = [["File1"]]
Files = [[A_File]]
ExistingFiles = [A_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [WgetFile ("NewFile.txt", File1)]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

52
testenv/Test-Parallel-Proto.py Executable file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This is a Prototype Test File for multiple servers.
Ideally this File should be copied and edited to write new tests.
"""
TEST_NAME = "Parallel Prototype"
############# File Definitions ###############################################
File1 = "Would you like some Tea?"
File2 = "With lemon or cream?"
File3 = "Sure you're joking Mr. Feynman"
A_File = WgetFile ("File1", File1)
B_File = WgetFile ("File2", File2)
C_File = WgetFile ("File3", File3)
WGET_OPTIONS = "-d"
WGET_URLS = [["File1"], ["File2"]]
Files = [[A_File], [B_File]]
Existing_Files = [C_File]
no_of_servers = 2
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, B_File, C_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files,
"LocalFiles" : Existing_Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test,
servers=no_of_servers
).begin ()
exit (err)

47
testenv/Test-Post.py Executable file
View File

@ -0,0 +1,47 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
Simple test for HTTP POST Requests usiong the --method command
"""
TEST_NAME = "HTTP POST Requests"
############# File Definitions ###############################################
File1 = """A reader lives a thousand lives before he dies, said Jojen.
The man who never reads lives only one"""
File1_response = """A reader lives a thousand lives before he dies, said Jojen.
The man who never reads lives only one
TestMessage"""
A_File = WgetFile ("File1", File1)
WGET_OPTIONS = "-d --method=post --body-data=TestMessage"
WGET_URLS = [["File1"]]
Files = [[A_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [WgetFile ("File1", File1_response)]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

70
testenv/Test-Proto.py Executable file
View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This is a Prototype Test File.
Ideally this File should be copied and edited to write new tests.
"""
TEST_NAME = "Prototype"
############# File Definitions ###############################################
File1 = "Would you like some Tea?"
File2 = "With lemon or cream?"
File3 = "Sure you're joking Mr. Feynman"
File1_rules = {
"Authentication" : {
"Type" : "Both",
"User" : "Sauron",
"Pass" : "TheEye"
},
"RejectHeader" : {
"Authorization" : "Basic U2F1cm9uOlRoZUV5ZQ=="
}
}
File2_rules = {
"Authentication" : {
"Type" : "Both_inline",
"User" : "Sauron",
"Pass" : "TheEye"
},
"SendHeader" : {
"Content-Disposition" : "Attachment; filename=newfile"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
B_File = WgetFile ("File2", File2, rules=File2_rules)
C_File = WgetFile ("File3", File3)
WGET_OPTIONS = "-d --content-disposition --user=Sauron --password=TheEye"
WGET_URLS = [["File1", "File2"]]
Files = [[A_File, B_File]]
Existing_Files = [C_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, WgetFile ("newfile", File2), C_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files,
"LocalFiles" : Existing_Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

50
testenv/Test-auth-basic-fail.py Executable file
View File

@ -0,0 +1,50 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget returns the correct exit code when Basic
authentcation failes due to a username/password error.
"""
TEST_NAME = "Basic Authentication Failure"
############# File Definitions ###############################################
File1 = "I am an invisble man."
File1_rules = {
"Authentication" : {
"Type" : "Basic",
"User" : "Sauron",
"Pass" : "TheEye"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d --user=Sauron --password=Eye"
WGET_URLS = [["File1"]]
Files = [[A_File]]
ExpectedReturnCode = 6
ExpectedDownloadedFiles = []
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

58
testenv/Test-auth-basic.py Executable file
View File

@ -0,0 +1,58 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures Wget's Basic Authorization Negotiation.
Also, we ensure that Wget saves the host after a successfull auth and
doesn't wait for a challenge the second time.
"""
TEST_NAME = "Basic Authorization"
############# File Definitions ###############################################
File1 = "I am an invisble man."
File2 = "I too am an invisible man."
File1_rules = {
"Authentication" : {
"Type" : "Basic",
"User" : "Sauron",
"Pass" : "TheEye"
}
}
File2_rules = {
"ExpectHeader" : {
"Authorization" : "Basic U2F1cm9uOlRoZUV5ZQ=="
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
B_File = WgetFile ("File2", File2, rules=File2_rules)
WGET_OPTIONS = "-d --user=Sauron --password=TheEye"
WGET_URLS = [["File1", "File2"]]
Files = [[A_File, B_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, B_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

76
testenv/Test-auth-both.py Executable file
View File

@ -0,0 +1,76 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures Wget's Basic Authorization Negotiation.
Also, we ensure that Wget saves the host after a successfull auth and
doesn't wait for a challenge the second time.
"""
TEST_NAME = "Multiple authentication support"
############# File Definitions ###############################################
File1 = "Would you like some Tea?"
File2 = "With lemon or cream?"
File3 = "Sure you're joking Mr. Feynman"
File1_rules = {
"Authentication" : {
"Type" : "Both",
"User" : "Sauron",
"Pass" : "TheEye"
},
"RejectHeader" : {
"Authorization" : "Basic U2F1cm9uOlRoZUV5ZQ=="
}
}
File2_rules = {
"Authentication" : {
"Type" : "Both_inline",
"User" : "Sauron",
"Pass" : "TheEye"
},
"RejectHeader" : {
"Authorization" : "Basic U2F1cm9uOlRoZUV5ZQ=="
}
}
File3_rules = {
"Authentication" : {
"Type" : "Digest",
"User" : "Sauron",
"Pass" : "TheEye"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
B_File = WgetFile ("File2", File2, rules=File2_rules)
C_File = WgetFile ("File3", File3, rules=File3_rules)
WGET_OPTIONS = "-d --user=Sauron --password=TheEye"
WGET_URLS = [["File1", "File2", "File3"]]
Files = [[A_File, B_File, C_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, B_File, C_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

49
testenv/Test-auth-digest.py Executable file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures Wget's Digest Authorization Negotiation.
"""
TEST_NAME = "Digest Authorization"
############# File Definitions ###############################################
File1 = "Need a cookie?"
File1_rules = {
"Authentication" : {
"Type" : "Digest",
"User" : "Pacman",
"Pass" : "Omnomnom"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d --user=Pacman --password=Omnomnom"
WGET_URLS = [["File1"]]
Files = [[A_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures Wget's Basic Authorization Negotiation, when credentials
are provided in-URL
"""
TEST_NAME = "Auth no challenge in URL"
############# File Definitions ###############################################
File1 = "Need a cookie?"
File1_rules = {
"Authentication" : {
"Type" : "Basic",
"User" : "Pacman",
"Pass" : "Omnomnom"
},
"ExpectHeader" : {
"Authorization" : "Basic UGFjbWFuOk9tbm9tbm9t"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d --auth-no-challenge http://Pacman:Omnomnom@localhost:{{port}}/File1"
WGET_URLS = [[]]
Files = [[A_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures Wget's Basic Authorization Negotiation, when the
--auth-no-challenge command is used.
"""
TEST_NAME = "Auth No Challenge Command"
############# File Definitions ###############################################
File1 = "Need a cookie?"
File1_rules = {
"Authentication" : {
"Type" : "Basic",
"User" : "Pacman",
"Pass" : "Omnomnom"
},
"ExpectHeader" : {
"Authorization" : "Basic UGFjbWFuOk9tbm9tbm9t"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d --auth-no-challenge --user=Pacman --password=Omnomnom"
WGET_URLS = [["File1"]]
Files = [[A_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

49
testenv/Test-auth-retcode.py Executable file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget returns the correct return code when sent
a 403 Forbidden by the Server.
"""
TEST_NAME = "Forbidden Retcode"
############# File Definitions ###############################################
File1 = "Apples and Oranges? Really?"
File1_rules = {
"Response" : 403
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d"
WGET_URLS = [["File1"]]
Files = [[A_File]]
ExpectedReturnCode = 8
ExpectedDownloadedFiles = []
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget handles Content-Disposition correctly when
coupled with Authentication
"""
TEST_NAME = "Authentication with Content Disposition"
############# File Definitions ###############################################
File1 = "Need a cookie?"
File1_rules = {
"Authentication" : {
"Type" : "Basic",
"User" : "Pacman",
"Pass" : "Omnomnom"
},
"SendHeader" : {
"Content-Disposition" : "Attachment; filename=Arch"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
WGET_OPTIONS = "-d --user=Pacman --password=Omnomnom --content-disposition"
WGET_URLS = [["File1"]]
Files = [[A_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [WgetFile ("Arch", File1)]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

52
testenv/Test-c-full.py Executable file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
Test Wget's response when the file requested already exists on disk with
a filesize greater than or equal to the requested file.
"""
TEST_NAME = "Test continue option"
############# File Definitions ###############################################
File1 = "abababababababababababababababababababababababababababababababababab"
File2 = "ababababababababababababababababababab"
A_File = WgetFile ("File1", File1)
B_File = WgetFile ("File1", File1)
C_File = WgetFile ("File2", File1)
D_File = WgetFile ("File2", File2)
E_File = WgetFile ("File3", File1)
WGET_OPTIONS = "-d -c"
WGET_URLS = [["File1", "File2", "File3"]]
Files = [[A_File, C_File, E_File]]
Existing_Files = [B_File, D_File]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, C_File, E_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files,
"LocalFiles" : Existing_Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

58
testenv/Test-cookie-401.py Executable file
View File

@ -0,0 +1,58 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget stores the cookie even in the event of a
401 Unauthorized Response
"""
TEST_NAME = "Basic Cookie 401 Response"
############# File Definitions ###############################################
File1 = """All happy families are alike;
Each unhappy family is unhappy in its own way"""
File2 = "Anyone for chocochip cookies?"
File1_rules = {
"SendHeader" : {
"Set-Cookie" : "sess-id=0213; path=/"
},
"Response" : 401
}
File2_rules = {
"ExpectHeader" : {
"Cookie" : "sess-id=0213"
},
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
B_File = WgetFile ("File2", File2, rules=File2_rules)
WGET_OPTIONS = "-d"
WGET_URLS = [["File1", "File2"]]
Files = [[A_File, B_File]]
ExpectedReturnCode = 6
ExpectedDownloadedFiles = [B_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

View File

@ -0,0 +1,56 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget identifies bad servers trying to set cookies
for a different domain and rejects them.
"""
TEST_NAME = "Cookie Domain Mismatch"
############# File Definitions ###############################################
File1 = "Would you care for a cup of coffee?"
File2 = "Anyone for chocochip cookies?"
File1_rules = {
"SendHeader" : {
"Set-Cookie" : "sess-id=0213; path=/; domain=.example.com"
}
}
File2_rules = {
"RejectHeader" : {
"Cookie" : "sess-id=0213"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
B_File = WgetFile ("File2", File2, rules=File2_rules)
WGET_OPTIONS = "-d"
WGET_URLS = [["File1", "File2"]]
Files = [[A_File, B_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, B_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

79
testenv/Test-cookie-expires.py Executable file
View File

@ -0,0 +1,79 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget handles Cookie expiry dates correctly.
Simultaneuously, we also check if multiple cookies to the same domain
are handled correctly
"""
TEST_NAME = "Cookie Expires"
############# File Definitions ###############################################
File1 = "Hello World!"
File2 = "'Ello! This is Amazing!"
File3 = "So what are we looking at?"
File4 = "This was downloaded"
File1_rules = {
"SendHeader" : {
"Set-Cookie" : "sess-id=0213; path=/"
}
}
File2_rules = {
"ExpectHeader" : {
"Cookie" : "sess-id=0213"
},
"SendHeader" : {
"Set-Cookie" : "new-sess=N"
}
}
File3_rules = {
"SendHeader" : {
"Set-Cookie" : "sess-id=0213; path=/; Expires=Sun, 06 Nov 2001 12:32:43 GMT"
},
"ExpectHeader" : {
"Cookie" : "new-sess=N; sess-id=0213"
}
}
File4_rules = {
"RejectHeader" : {
"Cookie" : "sess-id=0213"
},
"ExpectHeader" : {
"Cookie" : "new-sess=N"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
B_File = WgetFile ("File2", File2, rules=File2_rules)
C_File = WgetFile ("File3", File3, rules=File3_rules)
D_File = WgetFile ("File4", File4, rules=File4_rules)
WGET_OPTIONS = "-d"
WGET_URLS = [["File1", "File2", "File3", "File4"]]
Files = [[A_File, B_File, C_File, D_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, B_File, C_File, D_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

56
testenv/Test-cookie.py Executable file
View File

@ -0,0 +1,56 @@
#!/usr/bin/env python3
from sys import exit
from WgetTest import HTTPTest, WgetFile
"""
This test ensures that Wget's cookie jar support works correctly.
"""
TEST_NAME = "Basic Cookie Functionality"
############# File Definitions ###############################################
File1 = """All happy families are alike;
Each unhappy family is unhappy in its own way"""
File2 = "Anyone for chocochip cookies?"
File1_rules = {
"SendHeader" : {
"Set-Cookie" : "sess-id=0213; path=/"
}
}
File2_rules = {
"ExpectHeader" : {
"Cookie" : "sess-id=0213"
}
}
A_File = WgetFile ("File1", File1, rules=File1_rules)
B_File = WgetFile ("File2", File2, rules=File2_rules)
WGET_OPTIONS = "-d"
WGET_URLS = [["File1", "File2"]]
Files = [[A_File, B_File]]
ExpectedReturnCode = 0
ExpectedDownloadedFiles = [A_File, B_File]
################ Pre and Post Test Hooks #####################################
pre_test = {
"ServerFiles" : Files
}
test_options = {
"WgetCommands" : WGET_OPTIONS,
"Urls" : WGET_URLS
}
post_test = {
"ExpectedFiles" : ExpectedDownloadedFiles,
"ExpectedRetcode" : ExpectedReturnCode
}
err = HTTPTest (
name=TEST_NAME,
pre_hook=pre_test,
test_params=test_options,
post_hook=post_test
).begin ()
exit (err)

302
testenv/WgetTest.py Normal file
View File

@ -0,0 +1,302 @@
import os
import shutil
import shlex
import sys
import traceback
import HTTPServer
import re
from subprocess import call
from ColourTerm import printer
from difflib import unified_diff
""" A Custom Exception raised by the Test Environment. """
class TestFailed (Exception):
def __init__ (self, error):
self.error = error
""" Class that defines methods common to both HTTP and FTP Tests. """
class CommonMethods:
TestFailed = TestFailed
def init_test_env (self, name):
testDir = name + "-test"
try:
os.mkdir (testDir)
except FileExistsError:
shutil.rmtree (testDir)
os.mkdir (testDir)
os.chdir (testDir)
self.tests_passed = True
def get_domain_addr (self, addr):
self.port = str (addr[1])
return addr[0] + ":" + str(addr[1]) + "/"
def exec_wget (self, options, urls, domain_list):
cmd_line = self.get_cmd_line (options, urls, domain_list)
params = shlex.split (cmd_line)
print (params)
retcode = call (params)
return retcode
def get_cmd_line (self, options, urls, domain_list):
TEST_PATH = os.path.abspath (".")
WGET_PATH = os.path.join (TEST_PATH, "..", "..", "src", "wget")
WGET_PATH = os.path.abspath (WGET_PATH)
cmd_line = WGET_PATH + " " + options + " "
for i in range (0, self.servers):
for url in urls[i]:
cmd_line += domain_list[i] + url + " "
# for url in urls:
# cmd_line += domain_list[0] + url + " "
print (cmd_line)
return cmd_line
def __test_cleanup (self):
testDir = self.name + "-test"
os.chdir ('..')
try:
if os.getenv ("NO_CLEANUP") is None:
shutil.rmtree (testDir)
except Exception as ae:
print ("Unknown Exception while trying to remove Test Environment.")
def _exit_test (self):
self.__test_cleanup ()
def begin (self):
return 0 if self.tests_passed else 100
""" Methods to check if the Test Case passes or not. """
def __gen_local_filesys (self):
file_sys = dict ()
for parent, dirs, files in os.walk ('.'):
for name in files:
onefile = dict ()
# Create the full path to file, removing the leading ./
# Might not work on non-unix systems. Someone please test.
filepath = os.path.join (parent, name)
file_handle = open (filepath, 'r')
file_content = file_handle.read ()
onefile['content'] = file_content
filepath = filepath[2:]
file_sys[filepath] = onefile
file_handle.close ()
return file_sys
def __check_downloaded_files (self, exp_filesys):
local_filesys = self.__gen_local_filesys ()
for files in exp_filesys:
if files.name in local_filesys:
local_file = local_filesys.pop (files.name)
if files.content != local_file ['content']:
for line in unified_diff (local_file['content'], files.content, fromfile="Actual", tofile="Expected"):
sys.stderr.write (line)
raise TestFailed ("Contents of " + files.name + " do not match")
else:
raise TestFailed ("Expected file " + files.name + " not found")
if local_filesys:
print (local_filesys)
raise TestFailed ("Extra files downloaded.")
def _replace_substring (self, string):
pattern = re.compile ('\{\{\w+\}\}')
match_obj = pattern.search (string)
if match_obj is not None:
rep = match_obj.group()
temp = getattr (self, rep.strip ('{}'))
string = string.replace (rep, temp)
return string
""" Test Rule Definitions """
""" This should really be taken out soon. All this extra stuff to ensure
re-use of old code is crap. Someone needs to re-write it. The new rework
branch is much better written, but integrating it requires effort.
All these classes should never exist. The whole server needs to modified.
"""
class Authentication:
def __init__ (self, auth_obj):
self.auth_type = auth_obj['Type']
self.auth_user = auth_obj['User']
self.auth_pass = auth_obj['Pass']
class ExpectHeader:
def __init__ (self, header_obj):
self.headers = header_obj
class RejectHeader:
def __init__ (self, header_obj):
self.headers = header_obj
class Response:
def __init__ (self, retcode):
self.response_code = retcode
class SendHeader:
def __init__ (self, header_obj):
self.headers = header_obj
def get_server_rules (self, file_obj):
""" The handling of expect header could be made much better when the
options are parsed in a true and better fashion. For an example,
see the commented portion in Test-basic-auth.py.
"""
server_rules = dict ()
for rule in file_obj.rules:
r_obj = getattr (self, rule) (file_obj.rules[rule])
server_rules[rule] = r_obj
return server_rules
""" Pre-Test Hook Function Calls """
def ServerFiles (self, server_files):
for i in range (0, self.servers):
file_list = dict ()
server_rules = dict ()
for file_obj in server_files[i]:
content = self._replace_substring (file_obj.content)
file_list[file_obj.name] = content
rule_obj = self.get_server_rules (file_obj)
server_rules[file_obj.name] = rule_obj
self.server_list[i].server_conf (file_list, server_rules)
def LocalFiles (self, local_files):
for file_obj in local_files:
file_handler = open (file_obj.name, "w")
file_handler.write (file_obj.content)
file_handler.close ()
""" Test Option Function Calls """
def WgetCommands (self, command_list):
self.options = self._replace_substring (command_list)
def Urls (self, url_list):
self.urls = url_list
""" Post-Test Hook Function Calls """
def ExpectedRetcode (self, retcode):
if self.act_retcode != retcode:
pr = "Return codes do not match.\nExpected: " + str(retcode) + "\nActual: " + str(self.act_retcode)
raise TestFailed (pr)
def ExpectedFiles (self, exp_filesys):
self.__check_downloaded_files (exp_filesys)
def FilesCrawled (self, Request_Headers):
for i in range (0, self.servers):
headers = set(Request_Headers[i])
o_headers = self.Request_remaining[i]
header_diff = headers.symmetric_difference (o_headers)
if len(header_diff) is not 0:
printer ("RED", str (header_diff))
raise TestFailed ("Not all files were crawled correctly")
""" Class for HTTP Tests. """
class HTTPTest (CommonMethods):
# Temp Notes: It is expected that when pre-hook functions are executed, only an empty test-dir exists.
# pre-hook functions are executed just prior to the call to Wget is made.
# post-hook functions will be executed immediately after the call to Wget returns.
def __init__ (
self,
name="Unnamed Test",
pre_hook=dict(),
test_params=dict(),
post_hook=dict(),
servers=1
):
try:
self.HTTP_setup (name, pre_hook, test_params, post_hook, servers)
except TestFailed as tf:
printer ("RED", "Error: " + tf.error)
self.tests_passed = False
except Exception as ae:
printer ("RED", "Unhandled Exception Caught.")
print ( ae.__str__ ())
traceback.print_exc ()
self.tests_passed = False
else:
printer ("GREEN", "Test Passed")
finally:
self._exit_test ()
def HTTP_setup (self, name, pre_hook, test_params, post_hook, servers):
self.name = name
self.servers = servers
printer ("BLUE", "Running Test " + self.name)
self.init_test_env (name)
self.server_list = list()
self.domain_list = list()
for server_number in range (0, servers):
server_inst = self.init_HTTP_Server ()
self.server_list.append (server_inst)
domain = self.get_domain_addr (server_inst.server_address)
self.domain_list.append (domain)
#self.server = self.init_HTTP_Server ()
#self.domain = self.get_domain_addr (self.server.server_address)
for pre_hook_func in pre_hook:
try:
assert hasattr (self, pre_hook_func)
except AssertionError as ae:
self.stop_HTTP_Server (self.server)
raise TestFailed ("Pre Test Function " + pre_hook_func + " not defined.")
getattr (self, pre_hook_func) (pre_hook[pre_hook_func])
for test_func in test_params:
try:
assert hasattr (self, test_func)
except AssertionError as ae:
self.stop_HTTP_Server (self.server)
raise TestFailed ("Test Option " + test_func + " unknown.")
getattr (self, test_func) (test_params[test_func])
self.act_retcode = self.exec_wget (self.options, self.urls, self.domain_list)
self.stop_HTTP_Server ()
for post_hook_func in post_hook:
try:
assert hasattr (self, post_hook_func)
except AssertionError as ae:
raise TestFailed ("Post Test Function " + post_hook_func + " not defined.")
getattr (self, post_hook_func) (post_hook[post_hook_func])
def init_HTTP_Server (self):
server = HTTPServer.HTTPd ()
server.start ()
return server
def stop_HTTP_Server (self):
self.Request_remaining = list ()
for server in self.server_list:
server_req = server.server_inst.get_req_headers ()
self.Request_remaining.append (server_req)
server.server_inst.shutdown ()
""" WgetFile is a File Data Container object """
class WgetFile:
def __init__ (
self,
name,
content="Test Contents",
timestamp=None,
rules=dict()
):
self.name = name
self.content = content
self.timestamp = timestamp
self.rules = rules