Initial commit

This commit is contained in:
Untriex Programming
2021-08-31 22:06:02 +02:00
commit 9b6723e11e
5142 changed files with 1455625 additions and 0 deletions

View File

@@ -0,0 +1,13 @@
# Copyright 2017 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,2 @@
# The presence of this module means, at build time, Bazel used Python 3
# when resolving select() calls based on Python version.

View File

@@ -0,0 +1,80 @@
# Copyright 2017 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Internal helper for running tests on Windows Bazel."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
from absl import flags
try:
from absl.testing import _bazel_selected_py3
except ImportError:
_bazel_selected_py3 = None
FLAGS = flags.FLAGS
def get_executable_path(py_binary_name):
"""Returns the executable path of a py_binary.
This returns the executable path of a py_binary that is in another Bazel
target's data dependencies.
On Linux/macOS, the path and __file__ has the same root directory.
On Windows, bazel builds an .exe file and we need to use the MANIFEST file
the location the actual binary.
Args:
py_binary_name: string, the name of a py_binary that is in another Bazel
target's data dependencies.
Raises:
RuntimeError: Raised when it cannot locate the executable path.
"""
root, ext = os.path.splitext(py_binary_name)
suffix = 'py3' if _bazel_selected_py3 else 'py2'
py_binary_name = '{}_{}{}'.format(root, suffix, ext)
if os.name == 'nt':
py_binary_name += '.exe'
manifest_file = os.path.join(FLAGS.test_srcdir, 'MANIFEST')
workspace_name = os.environ['TEST_WORKSPACE']
manifest_entry = '{}/{}'.format(workspace_name, py_binary_name)
with open(manifest_file, 'r') as manifest_fd:
for line in manifest_fd:
tokens = line.strip().split(' ')
if len(tokens) != 2:
continue
if manifest_entry == tokens[0]:
return tokens[1]
raise RuntimeError(
'Cannot locate executable path for {}, MANIFEST file: {}.'.format(
py_binary_name, manifest_file))
else:
# NOTE: __file__ may be .py or .pyc, depending on how the module was
# loaded and executed.
path = __file__
# Use the package name to find the root directory: every dot is
# a directory, plus one for ourselves.
for _ in range(__name__.count('.') + 1):
path = os.path.dirname(path)
root_directory = path
return os.path.join(root_directory, py_binary_name)

View File

@@ -0,0 +1,33 @@
# Lint as: python3
# Copyright 2020 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Private module implementing async_wrapped method for wrapping async tests.
This is a separate private module so that parameterized still optionally
supports Python 2 syntax.
"""
import functools
import inspect
def async_wrapped(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)
return wrapper
def iscoroutinefunction(func):
return inspect.iscoroutinefunction(func)

View File

@@ -0,0 +1,96 @@
# Copyright 2018 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TestResult implementing default output for test execution status."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
from absl.third_party import unittest3_backport
class TextTestResult(unittest3_backport.TextTestResult):
"""TestResult class that provides the default text result formatting."""
def __init__(self, stream, descriptions, verbosity):
# Disable the verbose per-test output from the superclass, since it would
# conflict with our customized output.
super(TextTestResult, self).__init__(stream, descriptions, 0)
self._per_test_output = verbosity > 0
def _print_status(self, tag, test):
if self._per_test_output:
test_id = test.id()
if test_id.startswith('__main__.'):
test_id = test_id[len('__main__.'):]
print('[%s] %s' % (tag, test_id), file=self.stream)
self.stream.flush()
def startTest(self, test):
super(TextTestResult, self).startTest(test)
self._print_status(' RUN ', test)
def addSuccess(self, test):
super(TextTestResult, self).addSuccess(test)
self._print_status(' OK ', test)
def addError(self, test, err):
super(TextTestResult, self).addError(test, err)
self._print_status(' FAILED ', test)
def addFailure(self, test, err):
super(TextTestResult, self).addFailure(test, err)
self._print_status(' FAILED ', test)
def addSkip(self, test, reason):
super(TextTestResult, self).addSkip(test, reason)
self._print_status(' SKIPPED ', test)
def addExpectedFailure(self, test, err):
super(TextTestResult, self).addExpectedFailure(test, err)
self._print_status(' OK ', test)
def addUnexpectedSuccess(self, test):
super(TextTestResult, self).addUnexpectedSuccess(test)
self._print_status(' FAILED ', test)
class TextTestRunner(unittest.TextTestRunner):
"""A test runner that produces formatted text results."""
_TEST_RESULT_CLASS = TextTestResult
# Set this to true at the class or instance level to run tests using a
# debug-friendly method (e.g, one that doesn't catch exceptions and interacts
# better with debuggers).
# Usually this is set using --pdb_post_mortem.
run_for_debugging = False
def run(self, test):
# type: (TestCase) -> TestResult
if self.run_for_debugging:
return self._run_debug(test)
else:
return super(TextTestRunner, self).run(test)
def _run_debug(self, test):
# type: (TestCase) -> TestResult
test.debug()
# Return an empty result to indicate success.
return self._makeResult()
def _makeResult(self):
return TextTestResult(self.stream, self.descriptions, self.verbosity)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,198 @@
# Copyright 2017 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Decorator and context manager for saving and restoring flag values.
There are many ways to save and restore. Always use the most convenient method
for a given use case.
Here are examples of each method. They all call do_stuff() while FLAGS.someflag
is temporarily set to 'foo'.
from absl.testing import flagsaver
# Use a decorator which can optionally override flags via arguments.
@flagsaver.flagsaver(someflag='foo')
def some_func():
do_stuff()
# Use a decorator which can optionally override flags with flagholders.
@flagsaver.flagsaver((module.FOO_FLAG, 'foo'), (other_mod.BAR_FLAG, 23))
def some_func():
do_stuff()
# Use a decorator which does not override flags itself.
@flagsaver.flagsaver
def some_func():
FLAGS.someflag = 'foo'
do_stuff()
# Use a context manager which can optionally override flags via arguments.
with flagsaver.flagsaver(someflag='foo'):
do_stuff()
# Save and restore the flag values yourself.
saved_flag_values = flagsaver.save_flag_values()
try:
FLAGS.someflag = 'foo'
do_stuff()
finally:
flagsaver.restore_flag_values(saved_flag_values)
We save and restore a shallow copy of each Flag object's __dict__ attribute.
This preserves all attributes of the flag, such as whether or not it was
overridden from its default value.
WARNING: Currently a flag that is saved and then deleted cannot be restored. An
exception will be raised. However if you *add* a flag after saving flag values,
and then restore flag values, the added flag will be deleted with no errors.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import functools
import inspect
from absl import flags
FLAGS = flags.FLAGS
def flagsaver(*args, **kwargs):
"""The main flagsaver interface. See module doc for usage."""
if not args:
return _FlagOverrider(**kwargs)
# args can be [func] if used as `@flagsaver` instead of `@flagsaver(...)`
if len(args) == 1 and callable(args[0]):
if kwargs:
raise ValueError(
"It's invalid to specify both positional and keyword parameters.")
func = args[0]
if inspect.isclass(func):
raise TypeError('@flagsaver.flagsaver cannot be applied to a class.')
return _wrap(func, {})
# args can be a list of (FlagHolder, value) pairs.
# In which case they augment any specified kwargs.
for arg in args:
if not isinstance(arg, tuple) or len(arg) != 2:
raise ValueError('Expected (FlagHolder, value) pair, found %r' % (arg,))
holder, value = arg
if not isinstance(holder, flags.FlagHolder):
raise ValueError('Expected (FlagHolder, value) pair, found %r' % (arg,))
if holder.name in kwargs:
raise ValueError('Cannot set --%s multiple times' % holder.name)
kwargs[holder.name] = value
return _FlagOverrider(**kwargs)
def save_flag_values(flag_values=FLAGS):
"""Returns copy of flag values as a dict.
Args:
flag_values: FlagValues, the FlagValues instance with which the flag will
be saved. This should almost never need to be overridden.
Returns:
Dictionary mapping keys to values. Keys are flag names, values are
corresponding __dict__ members. E.g. {'key': value_dict, ...}.
"""
return {name: _copy_flag_dict(flag_values[name]) for name in flag_values}
def restore_flag_values(saved_flag_values, flag_values=FLAGS):
"""Restores flag values based on the dictionary of flag values.
Args:
saved_flag_values: {'flag_name': value_dict, ...}
flag_values: FlagValues, the FlagValues instance from which the flag will
be restored. This should almost never need to be overridden.
"""
new_flag_names = list(flag_values)
for name in new_flag_names:
saved = saved_flag_values.get(name)
if saved is None:
# If __dict__ was not saved delete "new" flag.
delattr(flag_values, name)
else:
if flag_values[name].value != saved['_value']:
flag_values[name].value = saved['_value'] # Ensure C++ value is set.
flag_values[name].__dict__ = saved
def _wrap(func, overrides):
"""Creates a wrapper function that saves/restores flag values.
Args:
func: function object - This will be called between saving flags and
restoring flags.
overrides: {str: object} - Flag names mapped to their values. These flags
will be set after saving the original flag state.
Returns:
return value from func()
"""
@functools.wraps(func)
def _flagsaver_wrapper(*args, **kwargs):
"""Wrapper function that saves and restores flags."""
with _FlagOverrider(**overrides):
return func(*args, **kwargs)
return _flagsaver_wrapper
class _FlagOverrider(object):
"""Overrides flags for the duration of the decorated function call.
It also restores all original values of flags after decorated method
completes.
"""
def __init__(self, **overrides):
self._overrides = overrides
self._saved_flag_values = None
def __call__(self, func):
if inspect.isclass(func):
raise TypeError('flagsaver cannot be applied to a class.')
return _wrap(func, self._overrides)
def __enter__(self):
self._saved_flag_values = save_flag_values(FLAGS)
try:
FLAGS._set_attributes(**self._overrides)
except:
# It may fail because of flag validators.
restore_flag_values(self._saved_flag_values, FLAGS)
raise
def __exit__(self, exc_type, exc_value, traceback):
restore_flag_values(self._saved_flag_values, FLAGS)
def _copy_flag_dict(flag):
"""Returns a copy of the flag object's __dict__.
It's mostly a shallow copy of the __dict__, except it also does a shallow
copy of the validator list.
Args:
flag: flags.Flag, the flag to copy.
Returns:
A copy of the flag object's __dict__.
"""
copy = flag.__dict__.copy()
copy['_value'] = flag.value # Ensure correct restore for C++ flags.
copy['validators'] = list(flag.validators)
return copy

View File

@@ -0,0 +1,703 @@
# Copyright 2017 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Adds support for parameterized tests to Python's unittest TestCase class.
A parameterized test is a method in a test case that is invoked with different
argument tuples.
A simple example:
class AdditionExample(parameterized.TestCase):
@parameterized.parameters(
(1, 2, 3),
(4, 5, 9),
(1, 1, 3))
def testAddition(self, op1, op2, result):
self.assertEqual(result, op1 + op2)
Each invocation is a separate test case and properly isolated just
like a normal test method, with its own setUp/tearDown cycle. In the
example above, there are three separate testcases, one of which will
fail due to an assertion error (1 + 1 != 3).
Parameters for invididual test cases can be tuples (with positional parameters)
or dictionaries (with named parameters):
class AdditionExample(parameterized.TestCase):
@parameterized.parameters(
{'op1': 1, 'op2': 2, 'result': 3},
{'op1': 4, 'op2': 5, 'result': 9},
)
def testAddition(self, op1, op2, result):
self.assertEqual(result, op1 + op2)
If a parameterized test fails, the error message will show the
original test name and the parameters for that test.
The id method of the test, used internally by the unittest framework, is also
modified to show the arguments (but note that the name reported by `id()`
doesn't match the actual test name, see below). To make sure that test names
stay the same across several invocations, object representations like
>>> class Foo(object):
... pass
>>> repr(Foo())
'<__main__.Foo object at 0x23d8610>'
are turned into '<__main__.Foo>'. When selecting a subset of test cases to run
on the command-line, the test cases contain an index suffix for each argument
in the order they were passed to `parameters()` (eg. testAddition0,
testAddition1, etc.) This naming scheme is subject to change; for more reliable
and stable names, especially in test logs, use `named_parameters()` instead.
Tests using `named_parameters()` are similar to `parameters()`, except only
tuples or dicts of args are supported. For tuples, the first parameter arg
has to be a string (or an object that returns an apt name when converted via
str()). For dicts, a value for the key 'testcase_name' must be present and must
be a string (or an object that returns an apt name when converted via str()):
class NamedExample(parameterized.TestCase):
@parameterized.named_parameters(
('Normal', 'aa', 'aaa', True),
('EmptyPrefix', '', 'abc', True),
('BothEmpty', '', '', True))
def testStartsWith(self, prefix, string, result):
self.assertEqual(result, string.startswith(prefix))
class NamedExample(parameterized.TestCase):
@parameterized.named_parameters(
{'testcase_name': 'Normal',
'result': True, 'string': 'aaa', 'prefix': 'aa'},
{'testcase_name': 'EmptyPrefix',
'result': True, 'string': 'abc', 'prefix': ''},
{'testcase_name': 'BothEmpty',
'result': True, 'string': '', 'prefix': ''})
def testStartsWith(self, prefix, string, result):
self.assertEqual(result, string.startswith(prefix))
Named tests also have the benefit that they can be run individually
from the command line:
$ testmodule.py NamedExample.testStartsWithNormal
.
--------------------------------------------------------------------
Ran 1 test in 0.000s
OK
Parameterized Classes
=====================
If invocation arguments are shared across test methods in a single
TestCase class, instead of decorating all test methods
individually, the class itself can be decorated:
@parameterized.parameters(
(1, 2, 3),
(4, 5, 9))
class ArithmeticTest(parameterized.TestCase):
def testAdd(self, arg1, arg2, result):
self.assertEqual(arg1 + arg2, result)
def testSubtract(self, arg1, arg2, result):
self.assertEqual(result - arg1, arg2)
Inputs from Iterables
=====================
If parameters should be shared across several test cases, or are dynamically
created from other sources, a single non-tuple iterable can be passed into
the decorator. This iterable will be used to obtain the test cases:
class AdditionExample(parameterized.TestCase):
@parameterized.parameters(
c.op1, c.op2, c.result for c in testcases
)
def testAddition(self, op1, op2, result):
self.assertEqual(result, op1 + op2)
Single-Argument Test Methods
============================
If a test method takes only one argument, the single arguments must not be
wrapped into a tuple:
class NegativeNumberExample(parameterized.TestCase):
@parameterized.parameters(
-1, -3, -4, -5
)
def testIsNegative(self, arg):
self.assertTrue(IsNegative(arg))
List/tuple as a Single Argument
===============================
If a test method takes a single argument of a list/tuple, it must be wrapped
inside a tuple:
class ZeroSumExample(parameterized.TestCase):
@parameterized.parameters(
([-1, 0, 1], ),
([-2, 0, 2], ),
)
def testSumIsZero(self, arg):
self.assertEqual(0, sum(arg))
Cartesian product of Parameter Values as Parametrized Test Cases
======================================================
If required to test method over a cartesian product of parameters,
`parameterized.product` may be used to facilitate generation of parameters
test combinations:
class TestModuloExample(parameterized.TestCase):
@parameterized.product(
num=[0, 20, 80],
modulo=[2, 4],
expected=[0]
)
def testModuloResult(self, num, modulo, expected):
self.assertEqual(expected, num % modulo)
This results in 6 test cases being created - one for each combination of the
parameters. It is also possible to supply sequences of keyword argument dicts
as elements of the cartesian product:
@parameterized.product(
(dict(num=5, modulo=3, expected=2),
dict(num=7, modulo=4, expected=3)),
dtype=(int, float)
)
def testModuloResult(self, num, modulo, expected, dtype):
self.assertEqual(expected, dtype(num) % modulo)
This results in 4 test cases being created - for each of the two sets of test
data (supplied as kwarg dicts) and for each of the two data types (supplied as
a named parameter). Multiple keyword argument dicts may be supplied if required.
Async Support
===============================
If a test needs to call async functions, it can inherit from both
parameterized.TestCase and another TestCase that supports async calls, such
as [asynctest](https://github.com/Martiusweb/asynctest):
import asynctest
class AsyncExample(parameterized.TestCase, asynctest.TestCase):
@parameterized.parameters(
('a', 1),
('b', 2),
)
async def testSomeAsyncFunction(self, arg, expected):
actual = await someAsyncFunction(arg)
self.assertEqual(actual, expected)
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import functools
import itertools
import re
import types
import unittest
from absl._collections_abc import abc
from absl.testing import absltest
import six
try:
from absl.testing import _parameterized_async
except (ImportError, SyntaxError):
_parameterized_async = None
_ADDR_RE = re.compile(r'\<([a-zA-Z0-9_\-\.]+) object at 0x[a-fA-F0-9]+\>')
_NAMED = object()
_ARGUMENT_REPR = object()
_NAMED_DICT_KEY = 'testcase_name'
class NoTestsError(Exception):
"""Raised when parameterized decorators do not generate any tests."""
class DuplicateTestNameError(Exception):
"""Raised when a parameterized test has the same test name multiple times."""
def __init__(self, test_class_name, new_test_name, original_test_name):
super(DuplicateTestNameError, self).__init__(
'Duplicate parameterized test name in {}: generated test name {!r} '
'(generated from {!r}) already exists. Consider using '
'named_parameters() to give your tests unique names and/or renaming '
'the conflicting test method.'.format(
test_class_name, new_test_name, original_test_name))
def _clean_repr(obj):
return _ADDR_RE.sub(r'<\1>', repr(obj))
def _non_string_or_bytes_iterable(obj):
return (isinstance(obj, abc.Iterable) and
not isinstance(obj, six.text_type) and
not isinstance(obj, six.binary_type))
def _format_parameter_list(testcase_params):
if isinstance(testcase_params, abc.Mapping):
return ', '.join('%s=%s' % (argname, _clean_repr(value))
for argname, value in six.iteritems(testcase_params))
elif _non_string_or_bytes_iterable(testcase_params):
return ', '.join(map(_clean_repr, testcase_params))
else:
return _format_parameter_list((testcase_params,))
class _ParameterizedTestIter(object):
"""Callable and iterable class for producing new test cases."""
def __init__(self, test_method, testcases, naming_type, original_name=None):
"""Returns concrete test functions for a test and a list of parameters.
The naming_type is used to determine the name of the concrete
functions as reported by the unittest framework. If naming_type is
_FIRST_ARG, the testcases must be tuples, and the first element must
have a string representation that is a valid Python identifier.
Args:
test_method: The decorated test method.
testcases: (list of tuple/dict) A list of parameter tuples/dicts for
individual test invocations.
naming_type: The test naming type, either _NAMED or _ARGUMENT_REPR.
original_name: The original test method name. When decorated on a test
method, None is passed to __init__ and test_method.__name__ is used.
Note test_method.__name__ might be different than the original defined
test method because of the use of other decorators. A more accurate
value is set by TestGeneratorMetaclass.__new__ later.
"""
self._test_method = test_method
self.testcases = testcases
self._naming_type = naming_type
if original_name is None:
original_name = test_method.__name__
self._original_name = original_name
self.__name__ = _ParameterizedTestIter.__name__
def __call__(self, *args, **kwargs):
raise RuntimeError('You appear to be running a parameterized test case '
'without having inherited from parameterized.'
'TestCase. This is bad because none of '
'your test cases are actually being run. You may also '
'be using another decorator before the parameterized '
'one, in which case you should reverse the order.')
def __iter__(self):
test_method = self._test_method
naming_type = self._naming_type
def make_bound_param_test(testcase_params):
@functools.wraps(test_method)
def bound_param_test(self):
if isinstance(testcase_params, abc.Mapping):
return test_method(self, **testcase_params)
elif _non_string_or_bytes_iterable(testcase_params):
return test_method(self, *testcase_params)
else:
return test_method(self, testcase_params)
if naming_type is _NAMED:
# Signal the metaclass that the name of the test function is unique
# and descriptive.
bound_param_test.__x_use_name__ = True
testcase_name = None
if isinstance(testcase_params, abc.Mapping):
if _NAMED_DICT_KEY not in testcase_params:
raise RuntimeError(
'Dict for named tests must contain key "%s"' % _NAMED_DICT_KEY)
# Create a new dict to avoid modifying the supplied testcase_params.
testcase_name = testcase_params[_NAMED_DICT_KEY]
testcase_params = {k: v for k, v in six.iteritems(testcase_params)
if k != _NAMED_DICT_KEY}
elif _non_string_or_bytes_iterable(testcase_params):
if not isinstance(testcase_params[0], six.string_types):
raise RuntimeError(
'The first element of named test parameters is the test name '
'suffix and must be a string')
testcase_name = testcase_params[0]
testcase_params = testcase_params[1:]
else:
raise RuntimeError(
'Named tests must be passed a dict or non-string iterable.')
test_method_name = self._original_name
# Support PEP-8 underscore style for test naming if used.
if (test_method_name.startswith('test_')
and testcase_name
and not testcase_name.startswith('_')):
test_method_name += '_'
bound_param_test.__name__ = test_method_name + str(testcase_name)
elif naming_type is _ARGUMENT_REPR:
# If it's a generator, convert it to a tuple and treat them as
# parameters.
if isinstance(testcase_params, types.GeneratorType):
testcase_params = tuple(testcase_params)
# The metaclass creates a unique, but non-descriptive method name for
# _ARGUMENT_REPR tests using an indexed suffix.
# To keep test names descriptive, only the original method name is used.
# To make sure test names are unique, we add a unique descriptive suffix
# __x_params_repr__ for every test.
params_repr = '(%s)' % (_format_parameter_list(testcase_params),)
bound_param_test.__x_params_repr__ = params_repr
else:
raise RuntimeError('%s is not a valid naming type.' % (naming_type,))
bound_param_test.__doc__ = '%s(%s)' % (
bound_param_test.__name__, _format_parameter_list(testcase_params))
if test_method.__doc__:
bound_param_test.__doc__ += '\n%s' % (test_method.__doc__,)
if (_parameterized_async and
_parameterized_async.iscoroutinefunction(test_method)):
return _parameterized_async.async_wrapped(bound_param_test)
return bound_param_test
return (make_bound_param_test(c) for c in self.testcases)
def _modify_class(class_object, testcases, naming_type):
assert not getattr(class_object, '_test_params_reprs', None), (
'Cannot add parameters to %s. Either it already has parameterized '
'methods, or its super class is also a parameterized class.' % (
class_object,))
# NOTE: _test_params_repr is private to parameterized.TestCase and it's
# metaclass; do not use it outside of those classes.
class_object._test_params_reprs = test_params_reprs = {}
for name, obj in six.iteritems(class_object.__dict__.copy()):
if (name.startswith(unittest.TestLoader.testMethodPrefix)
and isinstance(obj, types.FunctionType)):
delattr(class_object, name)
methods = {}
_update_class_dict_for_param_test_case(
class_object.__name__, methods, test_params_reprs, name,
_ParameterizedTestIter(obj, testcases, naming_type, name))
for meth_name, meth in six.iteritems(methods):
setattr(class_object, meth_name, meth)
def _parameter_decorator(naming_type, testcases):
"""Implementation of the parameterization decorators.
Args:
naming_type: The naming type.
testcases: Testcase parameters.
Raises:
NoTestsError: Raised when the decorator generates no tests.
Returns:
A function for modifying the decorated object.
"""
def _apply(obj):
if isinstance(obj, type):
_modify_class(obj, testcases, naming_type)
return obj
else:
return _ParameterizedTestIter(obj, testcases, naming_type)
if (len(testcases) == 1 and
not isinstance(testcases[0], tuple) and
not isinstance(testcases[0], abc.Mapping)):
# Support using a single non-tuple parameter as a list of test cases.
# Note that the single non-tuple parameter can't be Mapping either, which
# means a single dict parameter case.
assert _non_string_or_bytes_iterable(testcases[0]), (
'Single parameter argument must be a non-string non-Mapping iterable')
testcases = testcases[0]
if not isinstance(testcases, abc.Sequence):
testcases = list(testcases)
if not testcases:
raise NoTestsError(
'parameterized test decorators did not generate any tests. '
'Make sure you specify non-empty parameters, '
'and do not reuse generators more than once.')
return _apply
def parameters(*testcases):
"""A decorator for creating parameterized tests.
See the module docstring for a usage example.
Args:
*testcases: Parameters for the decorated method, either a single
iterable, or a list of tuples/dicts/objects (for tests with only one
argument).
Raises:
NoTestsError: Raised when the decorator generates no tests.
Returns:
A test generator to be handled by TestGeneratorMetaclass.
"""
return _parameter_decorator(_ARGUMENT_REPR, testcases)
def named_parameters(*testcases):
"""A decorator for creating parameterized tests.
See the module docstring for a usage example. For every parameter tuple
passed, the first element of the tuple should be a string and will be appended
to the name of the test method. Each parameter dict passed must have a value
for the key "testcase_name", the string representation of that value will be
appended to the name of the test method.
Args:
*testcases: Parameters for the decorated method, either a single iterable,
or a list of tuples or dicts.
Raises:
NoTestsError: Raised when the decorator generates no tests.
Returns:
A test generator to be handled by TestGeneratorMetaclass.
"""
return _parameter_decorator(_NAMED, testcases)
def product(*kwargs_seqs, **testgrid):
"""A decorator for running tests over cartesian product of parameters values.
See the module docstring for a usage example. The test will be run for every
possible combination of the parameters.
Args:
*kwargs_seqs: Each positional parameter is a sequence of keyword arg dicts;
every test case generated will include exactly one kwargs dict from each
positional parameter; these will then be merged to form an overall list
of arguments for the test case.
**testgrid: A mapping of parameter names and their possible values. Possible
values should given as either a list or a tuple.
Raises:
NoTestsError: Raised when the decorator generates no tests.
Returns:
A test generator to be handled by TestGeneratorMetaclass.
"""
for name, values in testgrid.items():
assert isinstance(values, (list, tuple)), (
'Values of {} must be given as list or tuple, found {}'.format(
name, type(values)))
prior_arg_names = set()
for kwargs_seq in kwargs_seqs:
assert ((isinstance(kwargs_seq, (list, tuple))) and
all(isinstance(kwargs, dict) for kwargs in kwargs_seq)), (
'Positional parameters must be a sequence of keyword arg'
'dicts, found {}'
.format(kwargs_seq))
if kwargs_seq:
arg_names = set(kwargs_seq[0])
assert all(set(kwargs) == arg_names for kwargs in kwargs_seq), (
'Keyword argument dicts within a single parameter must all have the '
'same keys, found {}'.format(kwargs_seq))
assert not (arg_names & prior_arg_names), (
'Keyword argument dict sequences must all have distinct argument '
'names, found duplicate(s) {}'
.format(sorted(arg_names & prior_arg_names)))
prior_arg_names |= arg_names
assert not (prior_arg_names & set(testgrid)), (
'Arguments supplied in kwargs dicts in positional parameters must not '
'overlap with arguments supplied as named parameters; found duplicate '
'argument(s) {}'.format(sorted(prior_arg_names & set(testgrid))))
# Convert testgrid into a sequence of sequences of kwargs dicts and combine
# with the positional parameters.
# So foo=[1,2], bar=[3,4] --> [[{foo: 1}, {foo: 2}], [{bar: 3, bar: 4}]]
testgrid = (tuple({k: v} for v in vs) for k, vs in testgrid.items())
testgrid = tuple(kwargs_seqs) + tuple(testgrid)
# Create all possible combinations of parameters as a cartesian product
# of parameter values.
testcases = [
dict(itertools.chain.from_iterable(case.items()
for case in cases))
for cases in itertools.product(*testgrid)
]
return _parameter_decorator(_ARGUMENT_REPR, testcases)
class TestGeneratorMetaclass(type):
"""Metaclass for adding tests generated by parameterized decorators."""
def __new__(cls, class_name, bases, dct):
# NOTE: _test_params_repr is private to parameterized.TestCase and it's
# metaclass; do not use it outside of those classes.
test_params_reprs = dct.setdefault('_test_params_reprs', {})
for name, obj in six.iteritems(dct.copy()):
if (name.startswith(unittest.TestLoader.testMethodPrefix) and
_non_string_or_bytes_iterable(obj)):
# NOTE: `obj` might not be a _ParameterizedTestIter in two cases:
# 1. a class-level iterable named test* that isn't a test, such as
# a list of something. Such attributes get deleted from the class.
#
# 2. If a decorator is applied to the parameterized test, e.g.
# @morestuff
# @parameterized.parameters(...)
# def test_foo(...): ...
#
# This is OK so long as the underlying parameterized function state
# is forwarded (e.g. using functool.wraps() and **without**
# accessing explicitly accessing the internal attributes.
if isinstance(obj, _ParameterizedTestIter):
# Update the original test method name so it's more accurate.
# The mismatch might happen when another decorator is used inside
# the parameterized decrators, and the inner decorator doesn't
# preserve its __name__.
obj._original_name = name
iterator = iter(obj)
dct.pop(name)
_update_class_dict_for_param_test_case(
class_name, dct, test_params_reprs, name, iterator)
# If the base class is a subclass of parameterized.TestCase, inherit its
# _test_params_reprs too.
for base in bases:
# Check if the base has _test_params_reprs first, then check if it's a
# subclass of parameterized.TestCase. Otherwise when this is called for
# the parameterized.TestCase definition itself, this raises because
# itself is not defined yet. This works as long as absltest.TestCase does
# not define _test_params_reprs.
base_test_params_reprs = getattr(base, '_test_params_reprs', None)
if base_test_params_reprs and issubclass(base, TestCase):
for test_method, test_method_id in base_test_params_reprs.items():
# test_method may both exists in base and this class.
# This class's method overrides base class's.
# That's why it should only inherit it if it does not exist.
test_params_reprs.setdefault(test_method, test_method_id)
return type.__new__(cls, class_name, bases, dct)
def _update_class_dict_for_param_test_case(
test_class_name, dct, test_params_reprs, name, iterator):
"""Adds individual test cases to a dictionary.
Args:
test_class_name: The name of the class tests are added to.
dct: The target dictionary.
test_params_reprs: The dictionary for mapping names to test IDs.
name: The original name of the test case.
iterator: The iterator generating the individual test cases.
Raises:
DuplicateTestNameError: Raised when a test name occurs multiple times.
RuntimeError: If non-parameterized functions are generated.
"""
for idx, func in enumerate(iterator):
assert callable(func), 'Test generators must yield callables, got %r' % (
func,)
if not (getattr(func, '__x_use_name__', None) or
getattr(func, '__x_params_repr__', None)):
raise RuntimeError(
'{}.{} generated a test function without using the parameterized '
'decorators. Only tests generated using the decorators are '
'supported.'.format(test_class_name, name))
if getattr(func, '__x_use_name__', False):
original_name = func.__name__
new_name = original_name
else:
original_name = name
new_name = '%s%d' % (original_name, idx)
if new_name in dct:
raise DuplicateTestNameError(test_class_name, new_name, original_name)
dct[new_name] = func
test_params_reprs[new_name] = getattr(func, '__x_params_repr__', '')
@six.add_metaclass(TestGeneratorMetaclass)
class TestCase(absltest.TestCase):
"""Base class for test cases using the parameters decorator."""
# visibility: private; do not call outside this class.
def _get_params_repr(self):
return self._test_params_reprs.get(self._testMethodName, '')
def __str__(self):
params_repr = self._get_params_repr()
if params_repr:
params_repr = ' ' + params_repr
return '{}{} ({})'.format(
self._testMethodName, params_repr,
unittest.util.strclass(self.__class__))
def id(self):
"""Returns the descriptive ID of the test.
This is used internally by the unittesting framework to get a name
for the test to be used in reports.
Returns:
The test id.
"""
base = super(TestCase, self).id()
params_repr = self._get_params_repr()
if params_repr:
# We include the params in the id so that, when reported in the
# test.xml file, the value is more informative than just "test_foo0".
# Use a space to separate them so that it's copy/paste friendly and
# easy to identify the actual test id.
return '{} {}'.format(base, params_repr)
else:
return base
# This function is kept CamelCase because it's used as a class's base class.
def CoopTestCase(other_base_class): # pylint: disable=invalid-name
"""Returns a new base class with a cooperative metaclass base.
This enables the TestCase to be used in combination
with other base classes that have custom metaclasses, such as
mox.MoxTestBase.
Only works with metaclasses that do not override type.__new__.
Example:
from absl.testing import parameterized
class ExampleTest(parameterized.CoopTestCase(OtherTestCase)):
...
Args:
other_base_class: (class) A test case base class.
Returns:
A new class object.
"""
metaclass = type(
'CoopMetaclass',
(other_base_class.__metaclass__,
TestGeneratorMetaclass), {})
return metaclass(
'CoopTestCase',
(other_base_class, TestCase), {})

View File

@@ -0,0 +1,573 @@
# Copyright 2017 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A Python test reporter that generates test reports in JUnit XML format."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import datetime
import re
import sys
import threading
import time
import traceback
import unittest
from xml.sax import saxutils
from absl.testing import _pretty_print_reporter
from absl.third_party import unittest3_backport
import six
# See http://www.w3.org/TR/REC-xml/#NT-Char
_bad_control_character_codes = set(range(0, 0x20)) - {0x9, 0xA, 0xD}
_control_character_conversions = {
chr(i): '\\x{:02x}'.format(i) for i in _bad_control_character_codes}
_escape_xml_attr_conversions = {
'"': '&quot;',
"'": '&apos;',
'\n': '&#xA;',
'\t': '&#x9;',
'\r': '&#xD;',
' ': '&#x20;'}
_escape_xml_attr_conversions.update(_control_character_conversions)
# When class or module level function fails, unittest/suite.py adds a
# _ErrorHolder instance instead of a real TestCase, and it has a description
# like "setUpClass (__main__.MyTestCase)".
_CLASS_OR_MODULE_LEVEL_TEST_DESC_REGEX = re.compile(r'^(\w+) \((\S+)\)$')
# NOTE: while saxutils.quoteattr() theoretically does the same thing; it
# seems to often end up being too smart for it's own good not escaping properly.
# This function is much more reliable.
def _escape_xml_attr(content):
"""Escapes xml attributes."""
# Note: saxutils doesn't escape the quotes.
return saxutils.escape(content, _escape_xml_attr_conversions)
def _escape_cdata(s):
"""Escapes a string to be used as XML CDATA.
CDATA characters are treated strictly as character data, not as XML markup,
but there are still certain restrictions on them.
Args:
s: the string to be escaped.
Returns:
An escaped version of the input string.
"""
for char, escaped in six.iteritems(_control_character_conversions):
s = s.replace(char, escaped)
return s.replace(']]>', ']] >')
def _iso8601_timestamp(timestamp):
"""Produces an ISO8601 datetime.
Args:
timestamp: an Epoch based timestamp in seconds.
Returns:
A iso8601 format timestamp if the input is a valid timestamp, None otherwise
"""
if timestamp is None or timestamp < 0:
return None
# Use utcfromtimestamp in PY2 because it doesn't have a built-in UTC object
if six.PY2:
return '%s+00:00' % datetime.datetime.utcfromtimestamp(
timestamp).isoformat()
else:
return datetime.datetime.fromtimestamp(
timestamp, tz=datetime.timezone.utc).isoformat()
def _print_xml_element_header(element, attributes, stream, indentation=''):
"""Prints an XML header of an arbitrary element.
Args:
element: element name (testsuites, testsuite, testcase)
attributes: 2-tuple list with (attributes, values) already escaped
stream: output stream to write test report XML to
indentation: indentation added to the element header
"""
stream.write('%s<%s' % (indentation, element))
for attribute in attributes:
if len(attribute) == 2 \
and attribute[0] is not None and attribute[1] is not None:
stream.write(' %s="%s"' % (attribute[0], attribute[1]))
stream.write('>\n')
# Copy time.time which ensures the real time is used internally.
# This prevents bad interactions with tests that stub out time.
_time_copy = time.time
if hasattr(traceback, '_some_str'):
# Use the traceback module str function to format safely.
_safe_str = traceback._some_str
else:
_safe_str = str # pylint: disable=invalid-name
class _TestCaseResult(object):
"""Private helper for _TextAndXMLTestResult that represents a test result.
Attributes:
test: A TestCase instance of an individual test method.
name: The name of the individual test method.
full_class_name: The full name of the test class.
run_time: The duration (in seconds) it took to run the test.
start_time: Epoch relative timestamp of when test started (in seconds)
errors: A list of error 4-tuples. Error tuple entries are
1) a string identifier of either "failure" or "error"
2) an exception_type
3) an exception_message
4) a string version of a sys.exc_info()-style tuple of values
('error', err[0], err[1], self._exc_info_to_string(err))
If the length of errors is 0, then the test is either passed or
skipped.
skip_reason: A string explaining why the test was skipped.
"""
def __init__(self, test):
self.run_time = -1
self.start_time = -1
self.skip_reason = None
self.errors = []
self.test = test
# Parse the test id to get its test name and full class path.
# Unfortunately there is no better way of knowning the test and class.
# Worse, unittest uses _ErrorHandler instances to represent class / module
# level failures.
test_desc = test.id() or str(test)
# Check if it's something like "setUpClass (__main__.TestCase)".
match = _CLASS_OR_MODULE_LEVEL_TEST_DESC_REGEX.match(test_desc)
if match:
name = match.group(1)
full_class_name = match.group(2)
else:
class_name = unittest.util.strclass(test.__class__)
if ((six.PY3 and isinstance(test, unittest.case._SubTest)) or
(six.PY2 and isinstance(test, unittest3_backport.case._SubTest))):
# If the test case is a _SubTest, the real TestCase instance is
# available as _SubTest.test_case.
class_name = unittest.util.strclass(test.test_case.__class__)
if test_desc.startswith(class_name + '.'):
# In a typical unittest.TestCase scenario, test.id() returns with
# a class name formatted using unittest.util.strclass.
name = test_desc[len(class_name)+1:]
full_class_name = class_name
else:
# Otherwise make a best effort to guess the test name and full class
# path.
parts = test_desc.rsplit('.', 1)
name = parts[-1]
full_class_name = parts[0] if len(parts) == 2 else ''
self.name = _escape_xml_attr(name)
self.full_class_name = _escape_xml_attr(full_class_name)
def set_run_time(self, time_in_secs):
self.run_time = time_in_secs
def set_start_time(self, time_in_secs):
self.start_time = time_in_secs
def print_xml_summary(self, stream):
"""Prints an XML Summary of a TestCase.
Status and result are populated as per JUnit XML test result reporter.
A test that has been skipped will always have a skip reason,
as every skip method in Python's unittest requires the reason arg to be
passed.
Args:
stream: output stream to write test report XML to
"""
if self.skip_reason is None:
status = 'run'
result = 'completed'
else:
status = 'notrun'
result = 'suppressed'
test_case_attributes = [
('name', '%s' % self.name),
('status', '%s' % status),
('result', '%s' % result),
('time', '%.1f' % self.run_time),
('classname', self.full_class_name),
('timestamp', _iso8601_timestamp(self.start_time)),
]
_print_xml_element_header('testcase', test_case_attributes, stream, ' ')
self._print_testcase_details(stream)
stream.write(' </testcase>\n')
def _print_testcase_details(self, stream):
for error in self.errors:
outcome, exception_type, message, error_msg = error # pylint: disable=unpacking-non-sequence
message = _escape_xml_attr(_safe_str(message))
exception_type = _escape_xml_attr(str(exception_type))
error_msg = _escape_cdata(error_msg)
stream.write(' <%s message="%s" type="%s"><![CDATA[%s]]></%s>\n'
% (outcome, message, exception_type, error_msg, outcome))
class _TestSuiteResult(object):
"""Private helper for _TextAndXMLTestResult."""
def __init__(self):
self.suites = {}
self.failure_counts = {}
self.error_counts = {}
self.overall_start_time = -1
self.overall_end_time = -1
self._testsuites_properties = {}
def add_test_case_result(self, test_case_result):
suite_name = type(test_case_result.test).__name__
if suite_name == '_ErrorHolder':
# _ErrorHolder is a special case created by unittest for class / module
# level functions.
suite_name = test_case_result.full_class_name.rsplit('.')[-1]
if ((six.PY3 and
isinstance(test_case_result.test, unittest.case._SubTest)) or
(six.PY2 and
isinstance(test_case_result.test, unittest3_backport.case._SubTest))):
# If the test case is a _SubTest, the real TestCase instance is
# available as _SubTest.test_case.
suite_name = type(test_case_result.test.test_case).__name__
self._setup_test_suite(suite_name)
self.suites[suite_name].append(test_case_result)
for error in test_case_result.errors:
# Only count the first failure or error so that the sum is equal to the
# total number of *testcases* that have failures or errors.
if error[0] == 'failure':
self.failure_counts[suite_name] += 1
break
elif error[0] == 'error':
self.error_counts[suite_name] += 1
break
def print_xml_summary(self, stream):
overall_test_count = sum(len(x) for x in self.suites.values())
overall_failures = sum(self.failure_counts.values())
overall_errors = sum(self.error_counts.values())
overall_attributes = [
('name', ''),
('tests', '%d' % overall_test_count),
('failures', '%d' % overall_failures),
('errors', '%d' % overall_errors),
('time', '%.1f' % (self.overall_end_time - self.overall_start_time)),
('timestamp', _iso8601_timestamp(self.overall_start_time)),
]
_print_xml_element_header('testsuites', overall_attributes, stream)
if self._testsuites_properties:
stream.write(' <properties>\n')
for name, value in sorted(six.iteritems(self._testsuites_properties)):
stream.write(' <property name="%s" value="%s"></property>\n' %
(_escape_xml_attr(name), _escape_xml_attr(str(value))))
stream.write(' </properties>\n')
for suite_name in self.suites:
suite = self.suites[suite_name]
suite_end_time = max(x.start_time + x.run_time for x in suite)
suite_start_time = min(x.start_time for x in suite)
failures = self.failure_counts[suite_name]
errors = self.error_counts[suite_name]
suite_attributes = [
('name', '%s' % suite_name),
('tests', '%d' % len(suite)),
('failures', '%d' % failures),
('errors', '%d' % errors),
('time', '%.1f' % (suite_end_time - suite_start_time)),
('timestamp', _iso8601_timestamp(suite_start_time)),
]
_print_xml_element_header('testsuite', suite_attributes, stream)
for test_case_result in suite:
test_case_result.print_xml_summary(stream)
stream.write('</testsuite>\n')
stream.write('</testsuites>\n')
def _setup_test_suite(self, suite_name):
"""Adds a test suite to the set of suites tracked by this test run.
Args:
suite_name: string, The name of the test suite being initialized.
"""
if suite_name in self.suites:
return
self.suites[suite_name] = []
self.failure_counts[suite_name] = 0
self.error_counts[suite_name] = 0
def set_end_time(self, timestamp_in_secs):
"""Sets the start timestamp of this test suite.
Args:
timestamp_in_secs: timestamp in seconds since epoch
"""
self.overall_end_time = timestamp_in_secs
def set_start_time(self, timestamp_in_secs):
"""Sets the end timestamp of this test suite.
Args:
timestamp_in_secs: timestamp in seconds since epoch
"""
self.overall_start_time = timestamp_in_secs
class _TextAndXMLTestResult(_pretty_print_reporter.TextTestResult):
"""Private TestResult class that produces both formatted text results and XML.
Used by TextAndXMLTestRunner.
"""
_TEST_SUITE_RESULT_CLASS = _TestSuiteResult
_TEST_CASE_RESULT_CLASS = _TestCaseResult
def __init__(self, xml_stream, stream, descriptions, verbosity,
time_getter=_time_copy, testsuites_properties=None):
super(_TextAndXMLTestResult, self).__init__(stream, descriptions, verbosity)
self.xml_stream = xml_stream
self.pending_test_case_results = {}
self.suite = self._TEST_SUITE_RESULT_CLASS()
if testsuites_properties:
self.suite._testsuites_properties = testsuites_properties
self.time_getter = time_getter
# This lock guards any mutations on pending_test_case_results.
self._pending_test_case_results_lock = threading.RLock()
def startTest(self, test):
self.start_time = self.time_getter()
super(_TextAndXMLTestResult, self).startTest(test)
def stopTest(self, test):
# Grabbing the write lock to avoid conflicting with stopTestRun.
with self._pending_test_case_results_lock:
super(_TextAndXMLTestResult, self).stopTest(test)
result = self.get_pending_test_case_result(test)
if not result:
test_name = test.id() or str(test)
sys.stderr.write('No pending test case: %s\n' % test_name)
return
test_id = id(test)
run_time = self.time_getter() - self.start_time
result.set_run_time(run_time)
result.set_start_time(self.start_time)
self.suite.add_test_case_result(result)
del self.pending_test_case_results[test_id]
def startTestRun(self):
self.suite.set_start_time(self.time_getter())
super(_TextAndXMLTestResult, self).startTestRun()
def stopTestRun(self):
self.suite.set_end_time(self.time_getter())
# All pending_test_case_results will be added to the suite and removed from
# the pending_test_case_results dictionary. Grabing the write lock to avoid
# results from being added during this process to avoid duplicating adds or
# accidentally erasing newly appended pending results.
with self._pending_test_case_results_lock:
# Errors in the test fixture (setUpModule, tearDownModule,
# setUpClass, tearDownClass) can leave a pending result which
# never gets added to the suite. The runner calls stopTestRun
# which gives us an opportunity to add these errors for
# reporting here.
for test_id in self.pending_test_case_results:
result = self.pending_test_case_results[test_id]
if hasattr(self, 'start_time'):
run_time = self.suite.overall_end_time - self.start_time
result.set_run_time(run_time)
result.set_start_time(self.start_time)
self.suite.add_test_case_result(result)
self.pending_test_case_results.clear()
def _exc_info_to_string(self, err, test=None):
"""Converts a sys.exc_info()-style tuple of values into a string.
This method must be overridden because the method signature in
unittest.TestResult changed between Python 2.2 and 2.4.
Args:
err: A sys.exc_info() tuple of values for an error.
test: The test method.
Returns:
A formatted exception string.
"""
if test:
return super(_TextAndXMLTestResult, self)._exc_info_to_string(err, test)
return ''.join(traceback.format_exception(*err))
def add_pending_test_case_result(self, test, error_summary=None,
skip_reason=None):
"""Adds result information to a test case result which may still be running.
If a result entry for the test already exists, add_pending_test_case_result
will add error summary tuples and/or overwrite skip_reason for the result.
If it does not yet exist, a result entry will be created.
Note that a test result is considered to have been run and passed
only if there are no errors or skip_reason.
Args:
test: A test method as defined by unittest
error_summary: A 4-tuple with the following entries:
1) a string identifier of either "failure" or "error"
2) an exception_type
3) an exception_message
4) a string version of a sys.exc_info()-style tuple of values
('error', err[0], err[1], self._exc_info_to_string(err))
If the length of errors is 0, then the test is either passed or
skipped.
skip_reason: a string explaining why the test was skipped
"""
with self._pending_test_case_results_lock:
test_id = id(test)
if test_id not in self.pending_test_case_results:
self.pending_test_case_results[test_id] = self._TEST_CASE_RESULT_CLASS(
test)
if error_summary:
self.pending_test_case_results[test_id].errors.append(error_summary)
if skip_reason:
self.pending_test_case_results[test_id].skip_reason = skip_reason
def delete_pending_test_case_result(self, test):
with self._pending_test_case_results_lock:
test_id = id(test)
del self.pending_test_case_results[test_id]
def get_pending_test_case_result(self, test):
test_id = id(test)
return self.pending_test_case_results.get(test_id, None)
def addSuccess(self, test):
super(_TextAndXMLTestResult, self).addSuccess(test)
self.add_pending_test_case_result(test)
def addError(self, test, err):
super(_TextAndXMLTestResult, self).addError(test, err)
error_summary = ('error', err[0], err[1],
self._exc_info_to_string(err, test=test))
self.add_pending_test_case_result(test, error_summary=error_summary)
def addFailure(self, test, err):
super(_TextAndXMLTestResult, self).addFailure(test, err)
error_summary = ('failure', err[0], err[1],
self._exc_info_to_string(err, test=test))
self.add_pending_test_case_result(test, error_summary=error_summary)
def addSkip(self, test, reason):
super(_TextAndXMLTestResult, self).addSkip(test, reason)
self.add_pending_test_case_result(test, skip_reason=reason)
def addExpectedFailure(self, test, err):
super(_TextAndXMLTestResult, self).addExpectedFailure(test, err)
if callable(getattr(test, 'recordProperty', None)):
test.recordProperty('EXPECTED_FAILURE',
self._exc_info_to_string(err, test=test))
self.add_pending_test_case_result(test)
def addUnexpectedSuccess(self, test):
super(_TextAndXMLTestResult, self).addUnexpectedSuccess(test)
test_name = test.id() or str(test)
error_summary = ('error', '', '',
'Test case %s should have failed, but passed.'
% (test_name))
self.add_pending_test_case_result(test, error_summary=error_summary)
def addSubTest(self, test, subtest, err): # pylint: disable=invalid-name
super(_TextAndXMLTestResult, self).addSubTest(test, subtest, err)
if err is not None:
if issubclass(err[0], test.failureException):
error_summary = ('failure', err[0], err[1],
self._exc_info_to_string(err, test=test))
else:
error_summary = ('error', err[0], err[1],
self._exc_info_to_string(err, test=test))
else:
error_summary = None
self.add_pending_test_case_result(subtest, error_summary=error_summary)
def printErrors(self):
super(_TextAndXMLTestResult, self).printErrors()
self.xml_stream.write('<?xml version="1.0"?>\n')
self.suite.print_xml_summary(self.xml_stream)
class TextAndXMLTestRunner(unittest.TextTestRunner):
"""A test runner that produces both formatted text results and XML.
It prints out the names of tests as they are run, errors as they
occur, and a summary of the results at the end of the test run.
"""
_TEST_RESULT_CLASS = _TextAndXMLTestResult
_xml_stream = None
_testsuites_properties = {}
def __init__(self, xml_stream=None, *args, **kwargs):
"""Initialize a TextAndXMLTestRunner.
Args:
xml_stream: file-like or None; XML-formatted test results are output
via this object's write() method. If None (the default), the
new instance behaves as described in the set_default_xml_stream method
documentation below.
*args: passed unmodified to unittest.TextTestRunner.__init__.
**kwargs: passed unmodified to unittest.TextTestRunner.__init__.
"""
super(TextAndXMLTestRunner, self).__init__(*args, **kwargs)
if xml_stream is not None:
self._xml_stream = xml_stream
# else, do not set self._xml_stream to None -- this allows implicit fallback
# to the class attribute's value.
@classmethod
def set_default_xml_stream(cls, xml_stream):
"""Sets the default XML stream for the class.
Args:
xml_stream: file-like or None; used for instances when xml_stream is None
or not passed to their constructors. If None is passed, instances
created with xml_stream=None will act as ordinary TextTestRunner
instances; this is the default state before any calls to this method
have been made.
"""
cls._xml_stream = xml_stream
def _makeResult(self):
if self._xml_stream is None:
return super(TextAndXMLTestRunner, self)._makeResult()
else:
return self._TEST_RESULT_CLASS(
self._xml_stream, self.stream, self.descriptions, self.verbosity,
testsuites_properties=self._testsuites_properties)
@classmethod
def set_testsuites_property(cls, key, value):
cls._testsuites_properties[key] = value