mirror of
				https://github.com/saitohirga/WSJT-X.git
				synced 2025-11-04 05:50:31 -05:00 
			
		
		
		
	
		
			
	
	
		
			590 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			590 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								"""
							 | 
						||
| 
								 | 
							
								TestCmd.py:  a testing framework for commands and scripts.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								The TestCmd module provides a framework for portable automated testing of
							 | 
						||
| 
								 | 
							
								executable commands and scripts (in any language, not just Python), especially
							 | 
						||
| 
								 | 
							
								commands and scripts that require file system interaction.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								In addition to running tests and evaluating conditions, the TestCmd module
							 | 
						||
| 
								 | 
							
								manages and cleans up one or more temporary workspace directories, and provides
							 | 
						||
| 
								 | 
							
								methods for creating files and directories in those workspace directories from
							 | 
						||
| 
								 | 
							
								in-line data, here-documents), allowing tests to be completely self-contained.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								A TestCmd environment object is created via the usual invocation:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    test = TestCmd()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								The TestCmd module provides pass_test(), fail_test(), and no_result() unbound
							 | 
						||
| 
								 | 
							
								methods that report test results for use with the Aegis change management
							 | 
						||
| 
								 | 
							
								system. These methods terminate the test immediately, reporting PASSED, FAILED
							 | 
						||
| 
								 | 
							
								or NO RESULT respectively and exiting with status 0 (success), 1 or 2
							 | 
						||
| 
								 | 
							
								respectively. This allows for a distinction between an actual failed test and a
							 | 
						||
| 
								 | 
							
								test that could not be properly evaluated because of an external condition (such
							 | 
						||
| 
								 | 
							
								as a full file system or incorrect permissions).
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								"""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# Copyright 2000 Steven Knight
							 | 
						||
| 
								 | 
							
								# This module is free software, and you may redistribute it and/or modify
							 | 
						||
| 
								 | 
							
								# it under the same terms as Python itself, so long as this copyright message
							 | 
						||
| 
								 | 
							
								# and disclaimer are retained in their original form.
							 | 
						||
| 
								 | 
							
								#
							 | 
						||
| 
								 | 
							
								# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
							 | 
						||
| 
								 | 
							
								# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
							 | 
						||
| 
								 | 
							
								# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
							 | 
						||
| 
								 | 
							
								# DAMAGE.
							 | 
						||
| 
								 | 
							
								#
							 | 
						||
| 
								 | 
							
								# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
							 | 
						||
| 
								 | 
							
								# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
							 | 
						||
| 
								 | 
							
								# PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
							 | 
						||
| 
								 | 
							
								# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
							 | 
						||
| 
								 | 
							
								# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# Copyright 2002-2003 Vladimir Prus.
							 | 
						||
| 
								 | 
							
								# Copyright 2002-2003 Dave Abrahams.
							 | 
						||
| 
								 | 
							
								# Copyright 2006 Rene Rivera.
							 | 
						||
| 
								 | 
							
								# Distributed under the Boost Software License, Version 1.0.
							 | 
						||
| 
								 | 
							
								#    (See accompanying file LICENSE_1_0.txt or copy at
							 | 
						||
| 
								 | 
							
								#         http://www.boost.org/LICENSE_1_0.txt)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from string import join, split
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								__author__ = "Steven Knight <knight@baldmt.com>"
							 | 
						||
| 
								 | 
							
								__revision__ = "TestCmd.py 0.D002 2001/08/31 14:56:12 software"
							 | 
						||
| 
								 | 
							
								__version__ = "0.02"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from types import *
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import os
							 | 
						||
| 
								 | 
							
								import os.path
							 | 
						||
| 
								 | 
							
								import re
							 | 
						||
| 
								 | 
							
								import shutil
							 | 
						||
| 
								 | 
							
								import stat
							 | 
						||
| 
								 | 
							
								import subprocess
							 | 
						||
| 
								 | 
							
								import sys
							 | 
						||
| 
								 | 
							
								import tempfile
							 | 
						||
| 
								 | 
							
								import traceback
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								tempfile.template = 'testcmd.'
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								_Cleanup = []
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def _clean():
							 | 
						||
| 
								 | 
							
								    global _Cleanup
							 | 
						||
| 
								 | 
							
								    list = _Cleanup[:]
							 | 
						||
| 
								 | 
							
								    _Cleanup = []
							 | 
						||
| 
								 | 
							
								    list.reverse()
							 | 
						||
| 
								 | 
							
								    for test in list:
							 | 
						||
| 
								 | 
							
								        test.cleanup()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								sys.exitfunc = _clean
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def caller(tblist, skip):
							 | 
						||
| 
								 | 
							
								    string = ""
							 | 
						||
| 
								 | 
							
								    arr = []
							 | 
						||
| 
								 | 
							
								    for file, line, name, text in tblist:
							 | 
						||
| 
								 | 
							
								        if file[-10:] == "TestCmd.py":
							 | 
						||
| 
								 | 
							
								                break
							 | 
						||
| 
								 | 
							
								        arr = [(file, line, name, text)] + arr
							 | 
						||
| 
								 | 
							
								    atfrom = "at"
							 | 
						||
| 
								 | 
							
								    for file, line, name, text in arr[skip:]:
							 | 
						||
| 
								 | 
							
								        if name == "?":
							 | 
						||
| 
								 | 
							
								            name = ""
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            name = " (" + name + ")"
							 | 
						||
| 
								 | 
							
								        string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
							 | 
						||
| 
								 | 
							
								        atfrom = "\tfrom"
							 | 
						||
| 
								 | 
							
								    return string
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def fail_test(self=None, condition=True, function=None, skip=0):
							 | 
						||
| 
								 | 
							
								    """Cause the test to fail.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      By default, the fail_test() method reports that the test FAILED and exits
							 | 
						||
| 
								 | 
							
								    with a status of 1. If a condition argument is supplied, the test fails
							 | 
						||
| 
								 | 
							
								    only if the condition is true.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								    if not condition:
							 | 
						||
| 
								 | 
							
								        return
							 | 
						||
| 
								 | 
							
								    if not function is None:
							 | 
						||
| 
								 | 
							
								        function()
							 | 
						||
| 
								 | 
							
								    of = ""
							 | 
						||
| 
								 | 
							
								    desc = ""
							 | 
						||
| 
								 | 
							
								    sep = " "
							 | 
						||
| 
								 | 
							
								    if not self is None:
							 | 
						||
| 
								 | 
							
								        if self.program:
							 | 
						||
| 
								 | 
							
								            of = " of " + join(self.program, " ")
							 | 
						||
| 
								 | 
							
								            sep = "\n\t"
							 | 
						||
| 
								 | 
							
								        if self.description:
							 | 
						||
| 
								 | 
							
								            desc = " [" + self.description + "]"
							 | 
						||
| 
								 | 
							
								            sep = "\n\t"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    at = caller(traceback.extract_stack(), skip)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    sys.stderr.write("FAILED test" + of + desc + sep + at + """
							 | 
						||
| 
								 | 
							
								in directory: """ + os.getcwd() )
							 | 
						||
| 
								 | 
							
								    sys.exit(1)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def no_result(self=None, condition=True, function=None, skip=0):
							 | 
						||
| 
								 | 
							
								    """Causes a test to exit with no valid result.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      By default, the no_result() method reports NO RESULT for the test and
							 | 
						||
| 
								 | 
							
								    exits with a status of 2. If a condition argument is supplied, the test
							 | 
						||
| 
								 | 
							
								    fails only if the condition is true.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								    if not condition:
							 | 
						||
| 
								 | 
							
								        return
							 | 
						||
| 
								 | 
							
								    if not function is None:
							 | 
						||
| 
								 | 
							
								        function()
							 | 
						||
| 
								 | 
							
								    of = ""
							 | 
						||
| 
								 | 
							
								    desc = ""
							 | 
						||
| 
								 | 
							
								    sep = " "
							 | 
						||
| 
								 | 
							
								    if not self is None:
							 | 
						||
| 
								 | 
							
								        if self.program:
							 | 
						||
| 
								 | 
							
								            of = " of " + self.program
							 | 
						||
| 
								 | 
							
								            sep = "\n\t"
							 | 
						||
| 
								 | 
							
								        if self.description:
							 | 
						||
| 
								 | 
							
								            desc = " [" + self.description + "]"
							 | 
						||
| 
								 | 
							
								            sep = "\n\t"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    at = caller(traceback.extract_stack(), skip)
							 | 
						||
| 
								 | 
							
								    sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
							 | 
						||
| 
								 | 
							
								    sys.exit(2)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def pass_test(self=None, condition=True, function=None):
							 | 
						||
| 
								 | 
							
								    """Causes a test to pass.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      By default, the pass_test() method reports PASSED for the test and exits
							 | 
						||
| 
								 | 
							
								    with a status of 0. If a condition argument is supplied, the test passes
							 | 
						||
| 
								 | 
							
								    only if the condition is true.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								    if not condition:
							 | 
						||
| 
								 | 
							
								        return
							 | 
						||
| 
								 | 
							
								    if not function is None:
							 | 
						||
| 
								 | 
							
								        function()
							 | 
						||
| 
								 | 
							
								    sys.stderr.write("PASSED\n")
							 | 
						||
| 
								 | 
							
								    sys.exit(0)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def match_exact(lines=None, matches=None):
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								      Returns whether the given lists or strings containing lines separated
							 | 
						||
| 
								 | 
							
								    using newline characters contain exactly the same data.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								    if not type(lines) is ListType:
							 | 
						||
| 
								 | 
							
								        lines = split(lines, "\n")
							 | 
						||
| 
								 | 
							
								    if not type(matches) is ListType:
							 | 
						||
| 
								 | 
							
								        matches = split(matches, "\n")
							 | 
						||
| 
								 | 
							
								    if len(lines) != len(matches):
							 | 
						||
| 
								 | 
							
								        return
							 | 
						||
| 
								 | 
							
								    for i in range(len(lines)):
							 | 
						||
| 
								 | 
							
								        if lines[i] != matches[i]:
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								    return 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def match_re(lines=None, res=None):
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								      Given lists or strings contain lines separated using newline characters.
							 | 
						||
| 
								 | 
							
								    This function matches those lines one by one, interpreting the lines in the
							 | 
						||
| 
								 | 
							
								    res parameter as regular expressions.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								    if not type(lines) is ListType:
							 | 
						||
| 
								 | 
							
								        lines = split(lines, "\n")
							 | 
						||
| 
								 | 
							
								    if not type(res) is ListType:
							 | 
						||
| 
								 | 
							
								        res = split(res, "\n")
							 | 
						||
| 
								 | 
							
								    if len(lines) != len(res):
							 | 
						||
| 
								 | 
							
								        return
							 | 
						||
| 
								 | 
							
								    for i in range(len(lines)):
							 | 
						||
| 
								 | 
							
								        if not re.compile("^" + res[i] + "$").search(lines[i]):
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								    return 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class TestCmd:
							 | 
						||
| 
								 | 
							
								    def __init__(self, description=None, program=None, workdir=None,
							 | 
						||
| 
								 | 
							
								        subdir=None, verbose=False, match=None, inpath=None):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        self._cwd = os.getcwd()
							 | 
						||
| 
								 | 
							
								        self.description_set(description)
							 | 
						||
| 
								 | 
							
								        self.program_set(program, inpath)
							 | 
						||
| 
								 | 
							
								        self.verbose_set(verbose)
							 | 
						||
| 
								 | 
							
								        if match is None:
							 | 
						||
| 
								 | 
							
								            self.match_func = match_re
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            self.match_func = match
							 | 
						||
| 
								 | 
							
								        self._dirlist = []
							 | 
						||
| 
								 | 
							
								        self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
							 | 
						||
| 
								 | 
							
								        env = os.environ.get('PRESERVE')
							 | 
						||
| 
								 | 
							
								        if env:
							 | 
						||
| 
								 | 
							
								            self._preserve['pass_test'] = env
							 | 
						||
| 
								 | 
							
								            self._preserve['fail_test'] = env
							 | 
						||
| 
								 | 
							
								            self._preserve['no_result'] = env
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            env = os.environ.get('PRESERVE_PASS')
							 | 
						||
| 
								 | 
							
								            if env is not None:
							 | 
						||
| 
								 | 
							
								                self._preserve['pass_test'] = env
							 | 
						||
| 
								 | 
							
								            env = os.environ.get('PRESERVE_FAIL')
							 | 
						||
| 
								 | 
							
								            if env is not None:
							 | 
						||
| 
								 | 
							
								                self._preserve['fail_test'] = env
							 | 
						||
| 
								 | 
							
								            env = os.environ.get('PRESERVE_PASS')
							 | 
						||
| 
								 | 
							
								            if env is not None:
							 | 
						||
| 
								 | 
							
								                self._preserve['PRESERVE_NO_RESULT'] = env
							 | 
						||
| 
								 | 
							
								        self._stdout = []
							 | 
						||
| 
								 | 
							
								        self._stderr = []
							 | 
						||
| 
								 | 
							
								        self.status = None
							 | 
						||
| 
								 | 
							
								        self.condition = 'no_result'
							 | 
						||
| 
								 | 
							
								        self.workdir_set(workdir)
							 | 
						||
| 
								 | 
							
								        self.subdir(subdir)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def __del__(self):
							 | 
						||
| 
								 | 
							
								        self.cleanup()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def __repr__(self):
							 | 
						||
| 
								 | 
							
								        return "%x" % id(self)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def cleanup(self, condition=None):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Removes any temporary working directories for the specified TestCmd
							 | 
						||
| 
								 | 
							
								        environment. If the environment variable PRESERVE was set when the
							 | 
						||
| 
								 | 
							
								        TestCmd environment was created, temporary working directories are not
							 | 
						||
| 
								 | 
							
								        removed. If any of the environment variables PRESERVE_PASS,
							 | 
						||
| 
								 | 
							
								        PRESERVE_FAIL or PRESERVE_NO_RESULT were set when the TestCmd
							 | 
						||
| 
								 | 
							
								        environment was created, then temporary working directories are not
							 | 
						||
| 
								 | 
							
								        removed if the test passed, failed or had no result, respectively.
							 | 
						||
| 
								 | 
							
								        Temporary working directories are also preserved for conditions
							 | 
						||
| 
								 | 
							
								        specified via the preserve method.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								          Typically, this method is not called directly, but is used when the
							 | 
						||
| 
								 | 
							
								        script exits to clean up temporary working directories as appropriate
							 | 
						||
| 
								 | 
							
								        for the exit status.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        if not self._dirlist:
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								        if condition is None:
							 | 
						||
| 
								 | 
							
								            condition = self.condition
							 | 
						||
| 
								 | 
							
								        if self._preserve[condition]:
							 | 
						||
| 
								 | 
							
								            for dir in self._dirlist:
							 | 
						||
| 
								 | 
							
								                print("Preserved directory %s" % dir)
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            list = self._dirlist[:]
							 | 
						||
| 
								 | 
							
								            list.reverse()
							 | 
						||
| 
								 | 
							
								            for dir in list:
							 | 
						||
| 
								 | 
							
								                self.writable(dir, 1)
							 | 
						||
| 
								 | 
							
								                shutil.rmtree(dir, ignore_errors=1)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        self._dirlist = []
							 | 
						||
| 
								 | 
							
								        self.workdir = None
							 | 
						||
| 
								 | 
							
								        os.chdir(self._cwd)
							 | 
						||
| 
								 | 
							
								        try:
							 | 
						||
| 
								 | 
							
								            global _Cleanup
							 | 
						||
| 
								 | 
							
								            _Cleanup.remove(self)
							 | 
						||
| 
								 | 
							
								        except (AttributeError, ValueError):
							 | 
						||
| 
								 | 
							
								            pass
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def description_set(self, description):
							 | 
						||
| 
								 | 
							
								        """Set the description of the functionality being tested."""
							 | 
						||
| 
								 | 
							
								        self.description = description
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def fail_test(self, condition=True, function=None, skip=0):
							 | 
						||
| 
								 | 
							
								        """Cause the test to fail."""
							 | 
						||
| 
								 | 
							
								        if not condition:
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								        self.condition = 'fail_test'
							 | 
						||
| 
								 | 
							
								        fail_test(self = self,
							 | 
						||
| 
								 | 
							
								                  condition = condition,
							 | 
						||
| 
								 | 
							
								                  function = function,
							 | 
						||
| 
								 | 
							
								                  skip = skip)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def match(self, lines, matches):
							 | 
						||
| 
								 | 
							
								        """Compare actual and expected file contents."""
							 | 
						||
| 
								 | 
							
								        return self.match_func(lines, matches)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def match_exact(self, lines, matches):
							 | 
						||
| 
								 | 
							
								        """Compare actual and expected file content exactly."""
							 | 
						||
| 
								 | 
							
								        return match_exact(lines, matches)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def match_re(self, lines, res):
							 | 
						||
| 
								 | 
							
								        """Compare file content with a regular expression."""
							 | 
						||
| 
								 | 
							
								        return match_re(lines, res)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def no_result(self, condition=True, function=None, skip=0):
							 | 
						||
| 
								 | 
							
								        """Report that the test could not be run."""
							 | 
						||
| 
								 | 
							
								        if not condition:
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								        self.condition = 'no_result'
							 | 
						||
| 
								 | 
							
								        no_result(self = self,
							 | 
						||
| 
								 | 
							
								                  condition = condition,
							 | 
						||
| 
								 | 
							
								                  function = function,
							 | 
						||
| 
								 | 
							
								                  skip = skip)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def pass_test(self, condition=True, function=None):
							 | 
						||
| 
								 | 
							
								        """Cause the test to pass."""
							 | 
						||
| 
								 | 
							
								        if not condition:
							 | 
						||
| 
								 | 
							
								            return
							 | 
						||
| 
								 | 
							
								        self.condition = 'pass_test'
							 | 
						||
| 
								 | 
							
								        pass_test(self, condition, function)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def preserve(self, *conditions):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Arrange for the temporary working directories for the specified
							 | 
						||
| 
								 | 
							
								        TestCmd environment to be preserved for one or more conditions. If no
							 | 
						||
| 
								 | 
							
								        conditions are specified, arranges for the temporary working
							 | 
						||
| 
								 | 
							
								        directories to be preserved for all conditions.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        if conditions is ():
							 | 
						||
| 
								 | 
							
								            conditions = ('pass_test', 'fail_test', 'no_result')
							 | 
						||
| 
								 | 
							
								        for cond in conditions:
							 | 
						||
| 
								 | 
							
								            self._preserve[cond] = 1
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def program_set(self, program, inpath):
							 | 
						||
| 
								 | 
							
								        """Set the executable program or script to be tested."""
							 | 
						||
| 
								 | 
							
								        if not inpath and program and not os.path.isabs(program[0]):
							 | 
						||
| 
								 | 
							
								            program[0] = os.path.join(self._cwd, program[0])
							 | 
						||
| 
								 | 
							
								        self.program = program
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def read(self, file, mode='rb'):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Reads and returns the contents of the specified file name. The file
							 | 
						||
| 
								 | 
							
								        name may be a list, in which case the elements are concatenated with
							 | 
						||
| 
								 | 
							
								        the os.path.join() method. The file is assumed to be under the
							 | 
						||
| 
								 | 
							
								        temporary working directory unless it is an absolute path name. The I/O
							 | 
						||
| 
								 | 
							
								        mode for the file may be specified and must begin with an 'r'. The
							 | 
						||
| 
								 | 
							
								        default is 'rb' (binary read).
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        if type(file) is ListType:
							 | 
						||
| 
								 | 
							
								            file = apply(os.path.join, tuple(file))
							 | 
						||
| 
								 | 
							
								        if not os.path.isabs(file):
							 | 
						||
| 
								 | 
							
								            file = os.path.join(self.workdir, file)
							 | 
						||
| 
								 | 
							
								        if mode[0] != 'r':
							 | 
						||
| 
								 | 
							
								            raise ValueError, "mode must begin with 'r'"
							 | 
						||
| 
								 | 
							
								        return open(file, mode).read()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def run(self, program=None, arguments=None, chdir=None, stdin=None,
							 | 
						||
| 
								 | 
							
								        universal_newlines=True):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Runs a test of the program or script for the test environment.
							 | 
						||
| 
								 | 
							
								        Standard output and error output are saved for future retrieval via the
							 | 
						||
| 
								 | 
							
								        stdout() and stderr() methods.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								          'universal_newlines' parameter controls how the child process
							 | 
						||
| 
								 | 
							
								        input/output streams are opened as defined for the same named Python
							 | 
						||
| 
								 | 
							
								        subprocess.POpen constructor parameter.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        if chdir:
							 | 
						||
| 
								 | 
							
								            if not os.path.isabs(chdir):
							 | 
						||
| 
								 | 
							
								                chdir = os.path.join(self.workpath(chdir))
							 | 
						||
| 
								 | 
							
								            if self.verbose:
							 | 
						||
| 
								 | 
							
								                sys.stderr.write("chdir(" + chdir + ")\n")
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            chdir = self.workdir
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        cmd = []
							 | 
						||
| 
								 | 
							
								        if program and program[0]:
							 | 
						||
| 
								 | 
							
								            if program[0] != self.program[0] and not os.path.isabs(program[0]):
							 | 
						||
| 
								 | 
							
								                program[0] = os.path.join(self._cwd, program[0])
							 | 
						||
| 
								 | 
							
								            cmd += program
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            cmd += self.program
							 | 
						||
| 
								 | 
							
								        if arguments:
							 | 
						||
| 
								 | 
							
								            cmd += arguments.split(" ")
							 | 
						||
| 
								 | 
							
								        if self.verbose:
							 | 
						||
| 
								 | 
							
								            sys.stderr.write(join(cmd, " ") + "\n")
							 | 
						||
| 
								 | 
							
								        p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
							 | 
						||
| 
								 | 
							
								            stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=chdir,
							 | 
						||
| 
								 | 
							
								            universal_newlines=universal_newlines)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if stdin:
							 | 
						||
| 
								 | 
							
								            if type(stdin) is ListType:
							 | 
						||
| 
								 | 
							
								                for line in stdin:
							 | 
						||
| 
								 | 
							
								                    p.tochild.write(line)
							 | 
						||
| 
								 | 
							
								            else:
							 | 
						||
| 
								 | 
							
								                p.tochild.write(stdin)
							 | 
						||
| 
								 | 
							
								        out, err = p.communicate()
							 | 
						||
| 
								 | 
							
								        self._stdout.append(out)
							 | 
						||
| 
								 | 
							
								        self._stderr.append(err)
							 | 
						||
| 
								 | 
							
								        self.status = p.returncode
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if self.verbose:
							 | 
						||
| 
								 | 
							
								            sys.stdout.write(self._stdout[-1])
							 | 
						||
| 
								 | 
							
								            sys.stderr.write(self._stderr[-1])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def stderr(self, run=None):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Returns the error output from the specified run number. If there is
							 | 
						||
| 
								 | 
							
								        no specified run number, then returns the error output of the last run.
							 | 
						||
| 
								 | 
							
								        If the run number is less than zero, then returns the error output from
							 | 
						||
| 
								 | 
							
								        that many runs back from the current run.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        if not run:
							 | 
						||
| 
								 | 
							
								            run = len(self._stderr)
							 | 
						||
| 
								 | 
							
								        elif run < 0:
							 | 
						||
| 
								 | 
							
								            run = len(self._stderr) + run
							 | 
						||
| 
								 | 
							
								        run -= 1
							 | 
						||
| 
								 | 
							
								        if run < 0:
							 | 
						||
| 
								 | 
							
								            return ''
							 | 
						||
| 
								 | 
							
								        return self._stderr[run]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def stdout(self, run=None):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Returns the standard output from the specified run number. If there
							 | 
						||
| 
								 | 
							
								        is no specified run number, then returns the standard output of the
							 | 
						||
| 
								 | 
							
								        last run. If the run number is less than zero, then returns the
							 | 
						||
| 
								 | 
							
								        standard output from that many runs back from the current run.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        if not run:
							 | 
						||
| 
								 | 
							
								            run = len(self._stdout)
							 | 
						||
| 
								 | 
							
								        elif run < 0:
							 | 
						||
| 
								 | 
							
								            run = len(self._stdout) + run
							 | 
						||
| 
								 | 
							
								        run -= 1
							 | 
						||
| 
								 | 
							
								        if run < 0:
							 | 
						||
| 
								 | 
							
								            return ''
							 | 
						||
| 
								 | 
							
								        return self._stdout[run]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def subdir(self, *subdirs):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Create new subdirectories under the temporary working directory, one
							 | 
						||
| 
								 | 
							
								        for each argument. An argument may be a list, in which case the list
							 | 
						||
| 
								 | 
							
								        elements are concatenated using the os.path.join() method.
							 | 
						||
| 
								 | 
							
								        Subdirectories multiple levels deep must be created using a separate
							 | 
						||
| 
								 | 
							
								        argument for each level:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        Returns the number of subdirectories actually created.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        count = 0
							 | 
						||
| 
								 | 
							
								        for sub in subdirs:
							 | 
						||
| 
								 | 
							
								            if sub is None:
							 | 
						||
| 
								 | 
							
								                continue
							 | 
						||
| 
								 | 
							
								            if type(sub) is ListType:
							 | 
						||
| 
								 | 
							
								                sub = apply(os.path.join, tuple(sub))
							 | 
						||
| 
								 | 
							
								            new = os.path.join(self.workdir, sub)
							 | 
						||
| 
								 | 
							
								            try:
							 | 
						||
| 
								 | 
							
								                os.mkdir(new)
							 | 
						||
| 
								 | 
							
								            except:
							 | 
						||
| 
								 | 
							
								                pass
							 | 
						||
| 
								 | 
							
								            else:
							 | 
						||
| 
								 | 
							
								                count += 1
							 | 
						||
| 
								 | 
							
								        return count
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def unlink(self, file):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Unlinks the specified file name. The file name may be a list, in
							 | 
						||
| 
								 | 
							
								        which case the elements are concatenated using the os.path.join()
							 | 
						||
| 
								 | 
							
								        method. The file is assumed to be under the temporary working directory
							 | 
						||
| 
								 | 
							
								        unless it is an absolute path name.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        if type(file) is ListType:
							 | 
						||
| 
								 | 
							
								            file = apply(os.path.join, tuple(file))
							 | 
						||
| 
								 | 
							
								        if not os.path.isabs(file):
							 | 
						||
| 
								 | 
							
								            file = os.path.join(self.workdir, file)
							 | 
						||
| 
								 | 
							
								        os.unlink(file)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def verbose_set(self, verbose):
							 | 
						||
| 
								 | 
							
								        """Set the verbose level."""
							 | 
						||
| 
								 | 
							
								        self.verbose = verbose
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def workdir_set(self, path):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Creates a temporary working directory with the specified path name.
							 | 
						||
| 
								 | 
							
								        If the path is a null string (''), a unique directory name is created.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        if os.path.isabs(path):
							 | 
						||
| 
								 | 
							
								            self.workdir = path
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            if path != None:
							 | 
						||
| 
								 | 
							
								                if path == '':
							 | 
						||
| 
								 | 
							
								                    path = tempfile.mktemp()
							 | 
						||
| 
								 | 
							
								                if path != None:
							 | 
						||
| 
								 | 
							
								                    os.mkdir(path)
							 | 
						||
| 
								 | 
							
								                self._dirlist.append(path)
							 | 
						||
| 
								 | 
							
								                global _Cleanup
							 | 
						||
| 
								 | 
							
								                try:
							 | 
						||
| 
								 | 
							
								                    _Cleanup.index(self)
							 | 
						||
| 
								 | 
							
								                except ValueError:
							 | 
						||
| 
								 | 
							
								                    _Cleanup.append(self)
							 | 
						||
| 
								 | 
							
								                # We would like to set self.workdir like this:
							 | 
						||
| 
								 | 
							
								                #     self.workdir = path
							 | 
						||
| 
								 | 
							
								                # But symlinks in the path will report things differently from
							 | 
						||
| 
								 | 
							
								                # os.getcwd(), so chdir there and back to fetch the canonical
							 | 
						||
| 
								 | 
							
								                # path.
							 | 
						||
| 
								 | 
							
								                cwd = os.getcwd()
							 | 
						||
| 
								 | 
							
								                os.chdir(path)
							 | 
						||
| 
								 | 
							
								                self.workdir = os.getcwd()
							 | 
						||
| 
								 | 
							
								                os.chdir(cwd)
							 | 
						||
| 
								 | 
							
								            else:
							 | 
						||
| 
								 | 
							
								                self.workdir = None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def workpath(self, *args):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Returns the absolute path name to a subdirectory or file within the
							 | 
						||
| 
								 | 
							
								        current temporary working directory. Concatenates the temporary working
							 | 
						||
| 
								 | 
							
								        directory name with the specified arguments using os.path.join().
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        return apply(os.path.join, (self.workdir,) + tuple(args))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def writable(self, top, write):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Make the specified directory tree writable (write == 1) or not
							 | 
						||
| 
								 | 
							
								        (write == None).
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        def _walk_chmod(arg, dirname, names):
							 | 
						||
| 
								 | 
							
								            st = os.stat(dirname)
							 | 
						||
| 
								 | 
							
								            os.chmod(dirname, arg(st[stat.ST_MODE]))
							 | 
						||
| 
								 | 
							
								            for name in names:
							 | 
						||
| 
								 | 
							
								                fullname = os.path.join(dirname, name)
							 | 
						||
| 
								 | 
							
								                st = os.stat(fullname)
							 | 
						||
| 
								 | 
							
								                os.chmod(fullname, arg(st[stat.ST_MODE]))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        _mode_writable = lambda mode: stat.S_IMODE(mode|0200)
							 | 
						||
| 
								 | 
							
								        _mode_non_writable = lambda mode: stat.S_IMODE(mode&~0200)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if write:
							 | 
						||
| 
								 | 
							
								            f = _mode_writable
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            f = _mode_non_writable
							 | 
						||
| 
								 | 
							
								        try:
							 | 
						||
| 
								 | 
							
								            os.path.walk(top, _walk_chmod, f)
							 | 
						||
| 
								 | 
							
								        except:
							 | 
						||
| 
								 | 
							
								            pass  # Ignore any problems changing modes.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    def write(self, file, content, mode='wb'):
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								          Writes the specified content text (second argument) to the specified
							 | 
						||
| 
								 | 
							
								        file name (first argument). The file name may be a list, in which case
							 | 
						||
| 
								 | 
							
								        the elements are concatenated using the os.path.join() method. The file
							 | 
						||
| 
								 | 
							
								        is created under the temporary working directory. Any subdirectories in
							 | 
						||
| 
								 | 
							
								        the path must already exist. The I/O mode for the file may be specified
							 | 
						||
| 
								 | 
							
								        and must begin with a 'w'. The default is 'wb' (binary write).
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        """
							 | 
						||
| 
								 | 
							
								        if type(file) is ListType:
							 | 
						||
| 
								 | 
							
								            file = apply(os.path.join, tuple(file))
							 | 
						||
| 
								 | 
							
								        if not os.path.isabs(file):
							 | 
						||
| 
								 | 
							
								            file = os.path.join(self.workdir, file)
							 | 
						||
| 
								 | 
							
								        if mode[0] != 'w':
							 | 
						||
| 
								 | 
							
								            raise ValueError, "mode must begin with 'w'"
							 | 
						||
| 
								 | 
							
								        open(file, mode).write(content)
							 |