diff --git a/build/virtualenv_packages.txt b/build/virtualenv_packages.txt index cce82b42b735..045ae1aea8b9 100644 --- a/build/virtualenv_packages.txt +++ b/build/virtualenv_packages.txt @@ -6,6 +6,7 @@ mozilla.pth:python/mozversioncontrol mozilla.pth:third_party/python/blessings mozilla.pth:third_party/python/compare-locales mozilla.pth:third_party/python/configobj +mozilla.pth:third_party/python/cram mozilla.pth:third_party/python/dlmanager mozilla.pth:third_party/python/futures mozilla.pth:third_party/python/jsmin diff --git a/third_party/python/cram/cram/__init__.py b/third_party/python/cram/cram/__init__.py new file mode 100644 index 000000000000..4b626c4027a4 --- /dev/null +++ b/third_party/python/cram/cram/__init__.py @@ -0,0 +1,6 @@ +"""Functional testing framework for command line applications""" + +from cram._main import main +from cram._test import test, testfile + +__all__ = ['main', 'test', 'testfile'] diff --git a/third_party/python/cram/cram/__main__.py b/third_party/python/cram/cram/__main__.py new file mode 100644 index 000000000000..e6b0aef978f4 --- /dev/null +++ b/third_party/python/cram/cram/__main__.py @@ -0,0 +1,10 @@ +"""Main module (invoked by "python -m cram")""" + +import sys + +import cram + +try: + sys.exit(cram.main(sys.argv[1:])) +except KeyboardInterrupt: + pass diff --git a/third_party/python/cram/cram/_cli.py b/third_party/python/cram/cram/_cli.py new file mode 100644 index 000000000000..8333b6b95102 --- /dev/null +++ b/third_party/python/cram/cram/_cli.py @@ -0,0 +1,134 @@ +"""The command line interface implementation""" + +import os +import sys + +from cram._encoding import b, bytestype, stdoutb +from cram._process import execute + +__all__ = ['runcli'] + +def _prompt(question, answers, auto=None): + """Write a prompt to stdout and ask for answer in stdin. + + answers should be a string, with each character a single + answer. An uppercase letter is considered the default answer. + + If an invalid answer is given, this asks again until it gets a + valid one. + + If auto is set, the question is answered automatically with the + specified value. + """ + default = [c for c in answers if c.isupper()] + while True: + sys.stdout.write('%s [%s] ' % (question, answers)) + sys.stdout.flush() + if auto is not None: + sys.stdout.write(auto + '\n') + sys.stdout.flush() + return auto + + answer = sys.stdin.readline().strip().lower() + if not answer and default: + return default[0] + elif answer and answer in answers.lower(): + return answer + +def _log(msg=None, verbosemsg=None, verbose=False): + """Write msg to standard out and flush. + + If verbose is True, write verbosemsg instead. + """ + if verbose: + msg = verbosemsg + if msg: + if isinstance(msg, bytestype): + stdoutb.write(msg) + else: # pragma: nocover + sys.stdout.write(msg) + sys.stdout.flush() + +def _patch(cmd, diff): + """Run echo [lines from diff] | cmd -p0""" + out, retcode = execute([cmd, '-p0'], stdin=b('').join(diff)) + return retcode == 0 + +def runcli(tests, quiet=False, verbose=False, patchcmd=None, answer=None): + """Run tests with command line interface input/output. + + tests should be a sequence of 2-tuples containing the following: + + (test path, test function) + + This function yields a new sequence where each test function is wrapped + with a function that handles CLI input/output. + + If quiet is True, diffs aren't printed. If verbose is True, + filenames and status information are printed. + + If patchcmd is set, a prompt is written to stdout asking if + changed output should be merged back into the original test. The + answer is read from stdin. If 'y', the test is patched using patch + based on the changed output. + """ + total, skipped, failed = [0], [0], [0] + + for path, test in tests: + def testwrapper(): + """Test function that adds CLI output""" + total[0] += 1 + _log(None, path + b(': '), verbose) + + refout, postout, diff = test() + if refout is None: + skipped[0] += 1 + _log('s', 'empty\n', verbose) + return refout, postout, diff + + abspath = os.path.abspath(path) + errpath = abspath + b('.err') + + if postout is None: + skipped[0] += 1 + _log('s', 'skipped\n', verbose) + elif not diff: + _log('.', 'passed\n', verbose) + if os.path.exists(errpath): + os.remove(errpath) + else: + failed[0] += 1 + _log('!', 'failed\n', verbose) + if not quiet: + _log('\n', None, verbose) + + errfile = open(errpath, 'wb') + try: + for line in postout: + errfile.write(line) + finally: + errfile.close() + + if not quiet: + origdiff = diff + diff = [] + for line in origdiff: + stdoutb.write(line) + diff.append(line) + + if (patchcmd and + _prompt('Accept this change?', 'yN', answer) == 'y'): + if _patch(patchcmd, diff): + _log(None, path + b(': merged output\n'), verbose) + os.remove(errpath) + else: + _log(path + b(': merge failed\n')) + + return refout, postout, diff + + yield (path, testwrapper) + + if total[0] > 0: + _log('\n', None, verbose) + _log('# Ran %s tests, %s skipped, %s failed.\n' + % (total[0], skipped[0], failed[0])) diff --git a/third_party/python/cram/cram/_diff.py b/third_party/python/cram/cram/_diff.py new file mode 100644 index 000000000000..487730508272 --- /dev/null +++ b/third_party/python/cram/cram/_diff.py @@ -0,0 +1,158 @@ +"""Utilities for diffing test files and their output""" + +import codecs +import difflib +import re + +from cram._encoding import b + +__all__ = ['esc', 'glob', 'regex', 'unified_diff'] + +def _regex(pattern, s): + """Match a regular expression or return False if invalid. + + >>> from cram._encoding import b + >>> [bool(_regex(r, b('foobar'))) for r in (b('foo.*'), b('***'))] + [True, False] + """ + try: + return re.match(pattern + b(r'\Z'), s) + except re.error: + return False + +def _glob(el, l): + r"""Match a glob-like pattern. + + The only supported special characters are * and ?. Escaping is + supported. + + >>> from cram._encoding import b + >>> bool(_glob(b(r'\* \\ \? fo?b*'), b('* \\ ? foobar'))) + True + """ + i, n = 0, len(el) + res = b('') + while i < n: + c = el[i:i + 1] + i += 1 + if c == b('\\') and el[i] in b('*?\\'): + res += el[i - 1:i + 1] + i += 1 + elif c == b('*'): + res += b('.*') + elif c == b('?'): + res += b('.') + else: + res += re.escape(c) + return _regex(res, l) + +def _matchannotation(keyword, matchfunc, el, l): + """Apply match function based on annotation keyword""" + ann = b(' (%s)\n' % keyword) + return el.endswith(ann) and matchfunc(el[:-len(ann)], l[:-1]) + +def regex(el, l): + """Apply a regular expression match to a line annotated with '(re)'""" + return _matchannotation('re', _regex, el, l) + +def glob(el, l): + """Apply a glob match to a line annotated with '(glob)'""" + return _matchannotation('glob', _glob, el, l) + +def esc(el, l): + """Apply an escape match to a line annotated with '(esc)'""" + ann = b(' (esc)\n') + + if el.endswith(ann): + el = codecs.escape_decode(el[:-len(ann)])[0] + b('\n') + if el == l: + return True + + if l.endswith(ann): + l = codecs.escape_decode(l[:-len(ann)])[0] + b('\n') + return el == l + +class _SequenceMatcher(difflib.SequenceMatcher, object): + """Like difflib.SequenceMatcher, but supports custom match functions""" + def __init__(self, *args, **kwargs): + self._matchers = kwargs.pop('matchers', []) + super(_SequenceMatcher, self).__init__(*args, **kwargs) + + def _match(self, el, l): + """Tests for matching lines using custom matchers""" + for matcher in self._matchers: + if matcher(el, l): + return True + return False + + def find_longest_match(self, alo, ahi, blo, bhi): + """Find longest matching block in a[alo:ahi] and b[blo:bhi]""" + # SequenceMatcher uses find_longest_match() to slowly whittle down + # the differences between a and b until it has each matching block. + # Because of this, we can end up doing the same matches many times. + matches = [] + for n, (el, line) in enumerate(zip(self.a[alo:ahi], self.b[blo:bhi])): + if el != line and self._match(el, line): + # This fools the superclass's method into thinking that the + # regex/glob in a is identical to b by replacing a's line (the + # expected output) with b's line (the actual output). + self.a[alo + n] = line + matches.append((n, el)) + ret = super(_SequenceMatcher, self).find_longest_match(alo, ahi, + blo, bhi) + # Restore the lines replaced above. Otherwise, the diff output + # would seem to imply that the tests never had any regexes/globs. + for n, el in matches: + self.a[alo + n] = el + return ret + +def unified_diff(l1, l2, fromfile=b(''), tofile=b(''), fromfiledate=b(''), + tofiledate=b(''), n=3, lineterm=b('\n'), matchers=None): + r"""Compare two sequences of lines; generate the delta as a unified diff. + + This is like difflib.unified_diff(), but allows custom matchers. + + >>> from cram._encoding import b + >>> l1 = [b('a\n'), b('? (glob)\n')] + >>> l2 = [b('a\n'), b('b\n')] + >>> (list(unified_diff(l1, l2, b('f1'), b('f2'), b('1970-01-01'), + ... b('1970-01-02'))) == + ... [b('--- f1\t1970-01-01\n'), b('+++ f2\t1970-01-02\n'), + ... b('@@ -1,2 +1,2 @@\n'), b(' a\n'), b('-? (glob)\n'), b('+b\n')]) + True + + >>> from cram._diff import glob + >>> list(unified_diff(l1, l2, matchers=[glob])) + [] + """ + if matchers is None: + matchers = [] + started = False + matcher = _SequenceMatcher(None, l1, l2, matchers=matchers) + for group in matcher.get_grouped_opcodes(n): + if not started: + if fromfiledate: + fromdate = b('\t') + fromfiledate + else: + fromdate = b('') + if tofiledate: + todate = b('\t') + tofiledate + else: + todate = b('') + yield b('--- ') + fromfile + fromdate + lineterm + yield b('+++ ') + tofile + todate + lineterm + started = True + i1, i2, j1, j2 = group[0][1], group[-1][2], group[0][3], group[-1][4] + yield (b("@@ -%d,%d +%d,%d @@" % (i1 + 1, i2 - i1, j1 + 1, j2 - j1)) + + lineterm) + for tag, i1, i2, j1, j2 in group: + if tag == 'equal': + for line in l1[i1:i2]: + yield b(' ') + line + continue + if tag == 'replace' or tag == 'delete': + for line in l1[i1:i2]: + yield b('-') + line + if tag == 'replace' or tag == 'insert': + for line in l2[j1:j2]: + yield b('+') + line diff --git a/third_party/python/cram/cram/_encoding.py b/third_party/python/cram/cram/_encoding.py new file mode 100644 index 000000000000..d639ccee19fd --- /dev/null +++ b/third_party/python/cram/cram/_encoding.py @@ -0,0 +1,106 @@ +"""Encoding utilities""" + +import os +import sys + +try: + import builtins +except ImportError: + import __builtin__ as builtins + +__all__ = ['b', 'bchr', 'bytestype', 'envencode', 'fsdecode', 'fsencode', + 'stdoutb', 'stderrb', 'u', 'ul', 'unicodetype'] + +bytestype = getattr(builtins, 'bytes', str) +unicodetype = getattr(builtins, 'unicode', str) + +if getattr(os, 'fsdecode', None) is not None: + fsdecode = os.fsdecode + fsencode = os.fsencode +elif bytestype is not str: + if sys.platform == 'win32': + def fsdecode(s): + """Decode a filename from the filesystem encoding""" + if isinstance(s, unicodetype): + return s + encoding = sys.getfilesystemencoding() + if encoding == 'mbcs': + return s.decode(encoding) + else: + return s.decode(encoding, 'surrogateescape') + + def fsencode(s): + """Encode a filename to the filesystem encoding""" + if isinstance(s, bytestype): + return s + encoding = sys.getfilesystemencoding() + if encoding == 'mbcs': + return s.encode(encoding) + else: + return s.encode(encoding, 'surrogateescape') + else: + def fsdecode(s): + """Decode a filename from the filesystem encoding""" + if isinstance(s, unicodetype): + return s + return s.decode(sys.getfilesystemencoding(), 'surrogateescape') + + def fsencode(s): + """Encode a filename to the filesystem encoding""" + if isinstance(s, bytestype): + return s + return s.encode(sys.getfilesystemencoding(), 'surrogateescape') +else: + def fsdecode(s): + """Decode a filename from the filesystem encoding""" + return s + + def fsencode(s): + """Encode a filename to the filesystem encoding""" + return s + +if bytestype is str: + def envencode(s): + """Encode a byte string to the os.environ encoding""" + return s +else: + envencode = fsdecode + +if getattr(sys.stdout, 'buffer', None) is not None: + stdoutb = sys.stdout.buffer + stderrb = sys.stderr.buffer +else: + stdoutb = sys.stdout + stderrb = sys.stderr + +if bytestype is str: + def b(s): + """Convert an ASCII string literal into a bytes object""" + return s + + bchr = chr + + def u(s): + """Convert an ASCII string literal into a unicode object""" + return s.decode('ascii') +else: + def b(s): + """Convert an ASCII string literal into a bytes object""" + return s.encode('ascii') + + def bchr(i): + """Return a bytes character for a given integer value""" + return bytestype([i]) + + def u(s): + """Convert an ASCII string literal into a unicode object""" + return s + +try: + eval(r'u""') +except SyntaxError: + ul = eval +else: + def ul(e): + """Evaluate e as a unicode string literal""" + return eval('u' + e) diff --git a/third_party/python/cram/cram/_main.py b/third_party/python/cram/cram/_main.py new file mode 100644 index 000000000000..11d457bb1658 --- /dev/null +++ b/third_party/python/cram/cram/_main.py @@ -0,0 +1,211 @@ +"""Main entry point""" + +import optparse +import os +import shlex +import shutil +import sys +import tempfile + +try: + import configparser +except ImportError: # pragma: nocover + import ConfigParser as configparser + +from cram._cli import runcli +from cram._encoding import b, fsencode, stderrb, stdoutb +from cram._run import runtests +from cram._xunit import runxunit + +def _which(cmd): + """Return the path to cmd or None if not found""" + cmd = fsencode(cmd) + for p in os.environ['PATH'].split(os.pathsep): + path = os.path.join(fsencode(p), cmd) + if os.path.isfile(path) and os.access(path, os.X_OK): + return os.path.abspath(path) + return None + +def _expandpath(path): + """Expands ~ and environment variables in path""" + return os.path.expanduser(os.path.expandvars(path)) + +class _OptionParser(optparse.OptionParser): + """Like optparse.OptionParser, but supports setting values through + CRAM= and .cramrc.""" + + def __init__(self, *args, **kwargs): + self._config_opts = {} + optparse.OptionParser.__init__(self, *args, **kwargs) + + def add_option(self, *args, **kwargs): + option = optparse.OptionParser.add_option(self, *args, **kwargs) + if option.dest and option.dest != 'version': + key = option.dest.replace('_', '-') + self._config_opts[key] = option.action == 'store_true' + return option + + def parse_args(self, args=None, values=None): + config = configparser.RawConfigParser() + config.read(_expandpath(os.environ.get('CRAMRC', '.cramrc'))) + defaults = {} + for key, isbool in self._config_opts.items(): + try: + if isbool: + try: + value = config.getboolean('cram', key) + except ValueError: + value = config.get('cram', key) + self.error('--%s: invalid boolean value: %r' + % (key, value)) + else: + value = config.get('cram', key) + except (configparser.NoSectionError, configparser.NoOptionError): + pass + else: + defaults[key] = value + self.set_defaults(**defaults) + + eargs = os.environ.get('CRAM', '').strip() + if eargs: + args = args or [] + args += shlex.split(eargs) + + try: + return optparse.OptionParser.parse_args(self, args, values) + except optparse.OptionValueError: + self.error(str(sys.exc_info()[1])) + +def _parseopts(args): + """Parse command line arguments""" + p = _OptionParser(usage='cram [OPTIONS] TESTS...', prog='cram') + p.add_option('-V', '--version', action='store_true', + help='show version information and exit') + p.add_option('-q', '--quiet', action='store_true', + help="don't print diffs") + p.add_option('-v', '--verbose', action='store_true', + help='show filenames and test status') + p.add_option('-i', '--interactive', action='store_true', + help='interactively merge changed test output') + p.add_option('-d', '--debug', action='store_true', + help='write script output directly to the terminal') + p.add_option('-y', '--yes', action='store_true', + help='answer yes to all questions') + p.add_option('-n', '--no', action='store_true', + help='answer no to all questions') + p.add_option('-E', '--preserve-env', action='store_true', + help="don't reset common environment variables") + p.add_option('--keep-tmpdir', action='store_true', + help='keep temporary directories') + p.add_option('--shell', action='store', default='/bin/sh', metavar='PATH', + help='shell to use for running tests (default: %default)') + p.add_option('--shell-opts', action='store', metavar='OPTS', + help='arguments to invoke shell with') + p.add_option('--indent', action='store', default=2, metavar='NUM', + type='int', help=('number of spaces to use for indentation ' + '(default: %default)')) + p.add_option('--xunit-file', action='store', metavar='PATH', + help='path to write xUnit XML output') + opts, paths = p.parse_args(args) + paths = [fsencode(path) for path in paths] + return opts, paths, p.get_usage + +def main(args): + """Main entry point. + + If you're thinking of using Cram in other Python code (e.g., unit tests), + consider using the test() or testfile() functions instead. + + :param args: Script arguments (excluding script name) + :type args: str + :return: Exit code (non-zero on failure) + :rtype: int + """ + opts, paths, getusage = _parseopts(args) + if opts.version: + sys.stdout.write("""Cram CLI testing framework (version 0.7) + +Copyright (C) 2010-2016 Brodie Rao and others +This is free software; see the source for copying conditions. There is NO +warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +""") + return + + conflicts = [('--yes', opts.yes, '--no', opts.no), + ('--quiet', opts.quiet, '--interactive', opts.interactive), + ('--debug', opts.debug, '--quiet', opts.quiet), + ('--debug', opts.debug, '--interactive', opts.interactive), + ('--debug', opts.debug, '--verbose', opts.verbose), + ('--debug', opts.debug, '--xunit-file', opts.xunit_file)] + for s1, o1, s2, o2 in conflicts: + if o1 and o2: + sys.stderr.write('options %s and %s are mutually exclusive\n' + % (s1, s2)) + return 2 + + shellcmd = _which(opts.shell) + if not shellcmd: + stderrb.write(b('shell not found: ') + fsencode(opts.shell) + b('\n')) + return 2 + shell = [shellcmd] + if opts.shell_opts: + shell += shlex.split(opts.shell_opts) + + patchcmd = None + if opts.interactive: + patchcmd = _which('patch') + if not patchcmd: + sys.stderr.write('patch(1) required for -i\n') + return 2 + + if not paths: + sys.stdout.write(getusage()) + return 2 + + badpaths = [path for path in paths if not os.path.exists(path)] + if badpaths: + stderrb.write(b('no such file: ') + badpaths[0] + b('\n')) + return 2 + + if opts.yes: + answer = 'y' + elif opts.no: + answer = 'n' + else: + answer = None + + tmpdir = os.environ['CRAMTMP'] = tempfile.mkdtemp('', 'cramtests-') + tmpdirb = fsencode(tmpdir) + proctmp = os.path.join(tmpdir, 'tmp') + for s in ('TMPDIR', 'TEMP', 'TMP'): + os.environ[s] = proctmp + + os.mkdir(proctmp) + try: + tests = runtests(paths, tmpdirb, shell, indent=opts.indent, + cleanenv=not opts.preserve_env, debug=opts.debug) + if not opts.debug: + tests = runcli(tests, quiet=opts.quiet, verbose=opts.verbose, + patchcmd=patchcmd, answer=answer) + if opts.xunit_file is not None: + tests = runxunit(tests, opts.xunit_file) + + hastests = False + failed = False + for path, test in tests: + hastests = True + refout, postout, diff = test() + if diff: + failed = True + + if not hastests: + sys.stderr.write('no tests found\n') + return 2 + + return int(failed) + finally: + if opts.keep_tmpdir: + stdoutb.write(b('# Kept temporary directory: ') + tmpdirb + + b('\n')) + else: + shutil.rmtree(tmpdir) diff --git a/third_party/python/cram/cram/_process.py b/third_party/python/cram/cram/_process.py new file mode 100644 index 000000000000..decdfbc3a708 --- /dev/null +++ b/third_party/python/cram/cram/_process.py @@ -0,0 +1,54 @@ +"""Utilities for running subprocesses""" + +import os +import signal +import subprocess +import sys + +from cram._encoding import fsdecode + +__all__ = ['PIPE', 'STDOUT', 'execute'] + +PIPE = subprocess.PIPE +STDOUT = subprocess.STDOUT + +def _makeresetsigpipe(): + """Make a function to reset SIGPIPE to SIG_DFL (for use in subprocesses). + + Doing subprocess.Popen(..., preexec_fn=makeresetsigpipe()) will prevent + Python's SIGPIPE handler (SIG_IGN) from being inherited by the + child process. + """ + if (sys.platform == 'win32' or + getattr(signal, 'SIGPIPE', None) is None): # pragma: nocover + return None + return lambda: signal.signal(signal.SIGPIPE, signal.SIG_DFL) + +def execute(args, stdin=None, stdout=None, stderr=None, cwd=None, env=None): + """Run a process and return its output and return code. + + stdin may either be None or a string to send to the process. + + stdout may either be None or PIPE. If set to PIPE, the process's output + is returned as a string. + + stderr may either be None or STDOUT. If stdout is set to PIPE and stderr + is set to STDOUT, the process's stderr output will be interleaved with + stdout and returned as a string. + + cwd sets the process's current working directory. + + env can be set to a dictionary to override the process's environment + variables. + + This function returns a 2-tuple of (output, returncode). + """ + if sys.platform == 'win32': # pragma: nocover + args = [fsdecode(arg) for arg in args] + + p = subprocess.Popen(args, stdin=PIPE, stdout=stdout, stderr=stderr, + cwd=cwd, env=env, bufsize=-1, + preexec_fn=_makeresetsigpipe(), + close_fds=os.name == 'posix') + out, err = p.communicate(stdin) + return out, p.returncode diff --git a/third_party/python/cram/cram/_run.py b/third_party/python/cram/cram/_run.py new file mode 100644 index 000000000000..9111c0f686bf --- /dev/null +++ b/third_party/python/cram/cram/_run.py @@ -0,0 +1,77 @@ +"""The test runner""" + +import os +import sys + +from cram._encoding import b, fsdecode, fsencode +from cram._test import testfile + +__all__ = ['runtests'] + +if sys.platform == 'win32': # pragma: nocover + def _walk(top): + top = fsdecode(top) + for root, dirs, files in os.walk(top): + yield (fsencode(root), + [fsencode(p) for p in dirs], + [fsencode(p) for p in files]) +else: + _walk = os.walk + +def _findtests(paths): + """Yield tests in paths in sorted order""" + for p in paths: + if os.path.isdir(p): + for root, dirs, files in _walk(p): + if os.path.basename(root).startswith(b('.')): + continue + for f in sorted(files): + if not f.startswith(b('.')) and f.endswith(b('.t')): + yield os.path.normpath(os.path.join(root, f)) + else: + yield os.path.normpath(p) + +def runtests(paths, tmpdir, shell, indent=2, cleanenv=True, debug=False): + """Run tests and yield results. + + This yields a sequence of 2-tuples containing the following: + + (test path, test function) + + The test function, when called, runs the test in a temporary directory + and returns a 3-tuple: + + (list of lines in the test, same list with actual output, diff) + """ + cwd = os.getcwd() + seen = set() + basenames = set() + for i, path in enumerate(_findtests(paths)): + abspath = os.path.abspath(path) + if abspath in seen: + continue + seen.add(abspath) + + if not os.stat(path).st_size: + yield (path, lambda: (None, None, None)) + continue + + basename = os.path.basename(path) + if basename in basenames: + basename = basename + b('-%s' % i) + else: + basenames.add(basename) + + def test(): + """Run test file""" + testdir = os.path.join(tmpdir, basename) + os.mkdir(testdir) + try: + os.chdir(testdir) + return testfile(abspath, shell, indent=indent, + cleanenv=cleanenv, debug=debug, + testname=path) + finally: + os.chdir(cwd) + + yield (path, test) diff --git a/third_party/python/cram/cram/_test.py b/third_party/python/cram/cram/_test.py new file mode 100644 index 000000000000..27ef99c597a5 --- /dev/null +++ b/third_party/python/cram/cram/_test.py @@ -0,0 +1,230 @@ +"""Utilities for running individual tests""" + +import itertools +import os +import re +import time + +from cram._encoding import b, bchr, bytestype, envencode, unicodetype +from cram._diff import esc, glob, regex, unified_diff +from cram._process import PIPE, STDOUT, execute + +__all__ = ['test', 'testfile'] + +_needescape = re.compile(b(r'[\x00-\x09\x0b-\x1f\x7f-\xff]')).search +_escapesub = re.compile(b(r'[\x00-\x09\x0b-\x1f\\\x7f-\xff]')).sub +_escapemap = dict((bchr(i), b(r'\x%02x' % i)) for i in range(256)) +_escapemap.update({b('\\'): b('\\\\'), b('\r'): b(r'\r'), b('\t'): b(r'\t')}) + +def _escape(s): + """Like the string-escape codec, but doesn't escape quotes""" + return (_escapesub(lambda m: _escapemap[m.group(0)], s[:-1]) + + b(' (esc)\n')) + +def test(lines, shell='/bin/sh', indent=2, testname=None, env=None, + cleanenv=True, debug=False): + r"""Run test lines and return input, output, and diff. + + This returns a 3-tuple containing the following: + + (list of lines in test, same list with actual output, diff) + + diff is a generator that yields the diff between the two lists. + + If a test exits with return code 80, the actual output is set to + None and diff is set to []. + + Note that the TESTSHELL environment variable is available in the + test (set to the specified shell). However, the TESTDIR and + TESTFILE environment variables are not available. To run actual + test files, see testfile(). + + Example usage: + + >>> from cram._encoding import b + >>> refout, postout, diff = test([b(' $ echo hi\n'), + ... b(' [a-z]{2} (re)\n')]) + >>> refout == [b(' $ echo hi\n'), b(' [a-z]{2} (re)\n')] + True + >>> postout == [b(' $ echo hi\n'), b(' hi\n')] + True + >>> bool(diff) + False + + lines may also be a single bytes string: + + >>> refout, postout, diff = test(b(' $ echo hi\n bye\n')) + >>> refout == [b(' $ echo hi\n'), b(' bye\n')] + True + >>> postout == [b(' $ echo hi\n'), b(' hi\n')] + True + >>> bool(diff) + True + >>> (b('').join(diff) == + ... b('--- \n+++ \n@@ -1,2 +1,2 @@\n $ echo hi\n- bye\n+ hi\n')) + True + + Note that the b() function is internal to Cram. If you're using Python 2, + use normal string literals instead. If you're using Python 3, use bytes + literals. + + :param lines: Test input + :type lines: bytes or collections.Iterable[bytes] + :param shell: Shell to run test in + :type shell: bytes or str or list[bytes] or list[str] + :param indent: Amount of indentation to use for shell commands + :type indent: int + :param testname: Optional test file name (used in diff output) + :type testname: bytes or None + :param env: Optional environment variables for the test shell + :type env: dict or None + :param cleanenv: Whether or not to sanitize the environment + :type cleanenv: bool + :param debug: Whether or not to run in debug mode (don't capture stdout) + :type debug: bool + :return: Input, output, and diff iterables + :rtype: (list[bytes], list[bytes], collections.Iterable[bytes]) + """ + indent = b(' ') * indent + cmdline = indent + b('$ ') + conline = indent + b('> ') + usalt = 'CRAM%s' % time.time() + salt = b(usalt) + + if env is None: + env = os.environ.copy() + + if cleanenv: + for s in ('LANG', 'LC_ALL', 'LANGUAGE'): + env[s] = 'C' + env['TZ'] = 'GMT' + env['CDPATH'] = '' + env['COLUMNS'] = '80' + env['GREP_OPTIONS'] = '' + + if isinstance(lines, bytestype): + lines = lines.splitlines(True) + + if isinstance(shell, (bytestype, unicodetype)): + shell = [shell] + env['TESTSHELL'] = shell[0] + + if debug: + stdin = [] + for line in lines: + if not line.endswith(b('\n')): + line += b('\n') + if line.startswith(cmdline): + stdin.append(line[len(cmdline):]) + elif line.startswith(conline): + stdin.append(line[len(conline):]) + + execute(shell + ['-'], stdin=b('').join(stdin), env=env) + return ([], [], []) + + after = {} + refout, postout = [], [] + i = pos = prepos = -1 + stdin = [] + for i, line in enumerate(lines): + if not line.endswith(b('\n')): + line += b('\n') + refout.append(line) + if line.startswith(cmdline): + after.setdefault(pos, []).append(line) + prepos = pos + pos = i + stdin.append(b('echo %s %s $?\n' % (usalt, i))) + stdin.append(line[len(cmdline):]) + elif line.startswith(conline): + after.setdefault(prepos, []).append(line) + stdin.append(line[len(conline):]) + elif not line.startswith(indent): + after.setdefault(pos, []).append(line) + stdin.append(b('echo %s %s $?\n' % (usalt, i + 1))) + + output, retcode = execute(shell + ['-'], stdin=b('').join(stdin), + stdout=PIPE, stderr=STDOUT, env=env) + if retcode == 80: + return (refout, None, []) + + pos = -1 + ret = 0 + for i, line in enumerate(output[:-1].splitlines(True)): + out, cmd = line, None + if salt in line: + out, cmd = line.split(salt, 1) + + if out: + if not out.endswith(b('\n')): + out += b(' (no-eol)\n') + + if _needescape(out): + out = _escape(out) + postout.append(indent + out) + + if cmd: + ret = int(cmd.split()[1]) + if ret != 0: + postout.append(indent + b('[%s]\n' % (ret))) + postout += after.pop(pos, []) + pos = int(cmd.split()[0]) + + postout += after.pop(pos, []) + + if testname: + diffpath = testname + errpath = diffpath + b('.err') + else: + diffpath = errpath = b('') + diff = unified_diff(refout, postout, diffpath, errpath, + matchers=[esc, glob, regex]) + for firstline in diff: + return refout, postout, itertools.chain([firstline], diff) + return refout, postout, [] + +def testfile(path, shell='/bin/sh', indent=2, env=None, cleanenv=True, + debug=False, testname=None): + """Run test at path and return input, output, and diff. + + This returns a 3-tuple containing the following: + + (list of lines in test, same list with actual output, diff) + + diff is a generator that yields the diff between the two lists. + + If a test exits with return code 80, the actual output is set to + None and diff is set to []. + + Note that the TESTDIR, TESTFILE, and TESTSHELL environment + variables are available to use in the test. + + :param path: Path to test file + :type path: bytes or str + :param shell: Shell to run test in + :type shell: bytes or str or list[bytes] or list[str] + :param indent: Amount of indentation to use for shell commands + :type indent: int + :param env: Optional environment variables for the test shell + :type env: dict or None + :param cleanenv: Whether or not to sanitize the environment + :type cleanenv: bool + :param debug: Whether or not to run in debug mode (don't capture stdout) + :type debug: bool + :param testname: Optional test file name (used in diff output) + :type testname: bytes or None + :return: Input, output, and diff iterables + :rtype: (list[bytes], list[bytes], collections.Iterable[bytes]) + """ + f = open(path, 'rb') + try: + abspath = os.path.abspath(path) + env = env or os.environ.copy() + env['TESTDIR'] = envencode(os.path.dirname(abspath)) + env['TESTFILE'] = envencode(os.path.basename(abspath)) + if testname is None: # pragma: nocover + testname = os.path.basename(abspath) + return test(f, shell, indent=indent, testname=testname, env=env, + cleanenv=cleanenv, debug=debug) + finally: + f.close() diff --git a/third_party/python/cram/cram/_xunit.py b/third_party/python/cram/cram/_xunit.py new file mode 100644 index 000000000000..0b3cb49cfc84 --- /dev/null +++ b/third_party/python/cram/cram/_xunit.py @@ -0,0 +1,173 @@ +"""xUnit XML output""" + +import locale +import os +import re +import socket +import sys +import time + +from cram._encoding import u, ul + +__all__ = ['runxunit'] + +_widecdataregex = ul(r"'(?:[^\x09\x0a\x0d\x20-\ud7ff\ue000-\ufffd" + r"\U00010000-\U0010ffff]|]]>)'") +_narrowcdataregex = ul(r"'(?:[^\x09\x0a\x0d\x20-\ud7ff\ue000-\ufffd]" + r"|]]>)'") +_widequoteattrregex = ul(r"'[^\x20\x21\x23-\x25\x27-\x3b\x3d" + r"\x3f-\ud7ff\ue000-\ufffd" + r"\U00010000-\U0010ffff]'") +_narrowquoteattrregex = ul(r"'[^\x20\x21\x23-\x25\x27-\x3b\x3d" + r"\x3f-\ud7ff\ue000-\ufffd]'") +_replacementchar = ul(r"'\N{REPLACEMENT CHARACTER}'") + +if sys.maxunicode >= 0x10ffff: # pragma: nocover + _cdatasub = re.compile(_widecdataregex).sub + _quoteattrsub = re.compile(_widequoteattrregex).sub +else: # pragma: nocover + _cdatasub = re.compile(_narrowcdataregex).sub + _quoteattrsub = re.compile(_narrowquoteattrregex).sub + +def _cdatareplace(m): + """Replace _cdatasub() regex match""" + if m.group(0) == u(']]>'): + return u(']]>]]>>> from cram._encoding import ul + >>> (_cdata('1<\'2\'>&"3\x00]]>\t\r\n') == + ... ul(r"'&\"3\ufffd]]>]]>'")) + True + """ + return u('') % _cdatasub(_cdatareplace, s) + +def _quoteattrreplace(m): + """Replace _quoteattrsub() regex match""" + return {u('\t'): u(' '), + u('\n'): u(' '), + u('\r'): u(' '), + u('"'): u('"'), + u('&'): u('&'), + u('<'): u('<'), + u('>'): u('>')}.get(m.group(0), _replacementchar) + +def _quoteattr(s): + r"""Escape a string for use as an XML attribute value. + + >>> from cram._encoding import ul + >>> (_quoteattr('1<\'2\'>&"3\x00]]>\t\r\n') == + ... ul(r"'\"1<\'2\'>&"3\ufffd]]> \"'")) + True + """ + return u('"%s"') % _quoteattrsub(_quoteattrreplace, s) + +def _timestamp(): + """Return the current time in ISO 8601 format""" + tm = time.localtime() + if tm.tm_isdst == 1: # pragma: nocover + tz = time.altzone + else: # pragma: nocover + tz = time.timezone + + timestamp = time.strftime('%Y-%m-%dT%H:%M:%S', tm) + tzhours = int(-tz / 60 / 60) + tzmins = int(abs(tz) / 60 % 60) + timestamp += u('%+03d:%02d') % (tzhours, tzmins) + return timestamp + +def runxunit(tests, xmlpath): + """Run tests with xUnit XML output. + + tests should be a sequence of 2-tuples containing the following: + + (test path, test function) + + This function yields a new sequence where each test function is wrapped + with a function that writes test results to an xUnit XML file. + """ + suitestart = time.time() + timestamp = _timestamp() + hostname = socket.gethostname() + total, skipped, failed = [0], [0], [0] + testcases = [] + + for path, test in tests: + def testwrapper(): + """Run test and collect XML output""" + total[0] += 1 + + start = time.time() + refout, postout, diff = test() + testtime = time.time() - start + + classname = path.decode(locale.getpreferredencoding(), 'replace') + name = os.path.basename(classname) + + if postout is None: + skipped[0] += 1 + testcase = (u(' \n' + ' \n' + ' \n') % + {'classname': _quoteattr(classname), + 'name': _quoteattr(name), + 'time': testtime}) + elif diff: + failed[0] += 1 + diff = list(diff) + diffu = u('').join(l.decode(locale.getpreferredencoding(), + 'replace') + for l in diff) + testcase = (u(' \n' + ' %(diff)s\n' + ' \n') % + {'classname': _quoteattr(classname), + 'name': _quoteattr(name), + 'time': testtime, + 'diff': _cdata(diffu)}) + else: + testcase = (u(' \n') % + {'classname': _quoteattr(classname), + 'name': _quoteattr(name), + 'time': testtime}) + testcases.append(testcase) + + return refout, postout, diff + + yield path, testwrapper + + suitetime = time.time() - suitestart + header = (u('\n' + '\n') % + {'total': total[0], + 'failed': failed[0], + 'skipped': skipped[0], + 'timestamp': _quoteattr(timestamp), + 'hostname': _quoteattr(hostname), + 'time': suitetime}) + footer = u('\n') + + xmlfile = open(xmlpath, 'wb') + try: + xmlfile.write(header.encode('utf-8')) + for testcase in testcases: + xmlfile.write(testcase.encode('utf-8')) + xmlfile.write(footer.encode('utf-8')) + finally: + xmlfile.close()