From c48fe567f796601a422aa1b611406ab034ed330a Mon Sep 17 00:00:00 2001 From: CPython Devleopers <> Date: Tue, 21 Feb 2023 13:40:09 +0900 Subject: [PATCH 1/5] Update test.support from CPython 3.11.2 --- Lib/test/support/__init__.py | 241 +++++++++++++++++++++------ Lib/test/support/import_helper.py | 46 ++++- Lib/test/support/os_helper.py | 106 +++++++++++- Lib/test/support/script_helper.py | 8 + Lib/test/support/socket_helper.py | 9 +- Lib/test/support/testresult.py | 2 +- Lib/test/support/threading_helper.py | 35 ++++ Lib/test/support/warnings_helper.py | 8 + Lib/test/test_support.py | 44 +++-- 9 files changed, 410 insertions(+), 89 deletions(-) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 3ce3a0707f..e9736cd5ba 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -5,6 +5,7 @@ import contextlib import functools +import getpass import os import re import stat @@ -39,18 +40,21 @@ "requires_gzip", "requires_bz2", "requires_lzma", "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", "requires_IEEE_754", "requires_zlib", + "has_fork_support", "requires_fork", + "has_subprocess_support", "requires_subprocess", + "has_socket_support", "requires_working_socket", "anticipate_failure", "load_package_tests", "detect_api_mismatch", "check__all__", "skip_if_buggy_ucrt_strfptime", "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer", # sys - "is_jython", "is_android", "check_impl_detail", "unix_shell", - "setswitchinterval", + "is_jython", "is_android", "is_emscripten", "is_wasi", + "check_impl_detail", "unix_shell", "setswitchinterval", # network "open_urlresource", # processes "reap_children", # miscellaneous - "run_with_locale", "swap_item", "findfile", + "run_with_locale", "swap_item", "findfile", "infinite_recursion", "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", "run_with_tz", "PGO", "missing_compiler_executable", "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST", @@ -99,6 +103,13 @@ # option. LONG_TIMEOUT = 5 * 60.0 +# TEST_HOME_DIR refers to the top level directory of the "test" package +# that contains Python's regression test suite +TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) +STDLIB_DIR = os.path.dirname(TEST_HOME_DIR) +REPO_ROOT = os.path.dirname(STDLIB_DIR) + class Error(Exception): """Base class for regression test exceptions.""" @@ -148,9 +159,7 @@ def load_tests(*args): """ if pattern is None: pattern = "test*" - top_dir = os.path.dirname( # Lib - os.path.dirname( # test - os.path.dirname(__file__))) # support + top_dir = STDLIB_DIR package_tests = loader.discover(start_dir=pkg_dir, top_level_dir=top_dir, pattern=pattern) @@ -190,6 +199,11 @@ def get_original_stdout(): def _force_run(path, func, *args): try: return func(*args) + except FileNotFoundError as err: + # chmod() won't fix a missing file. + if verbose >= 2: + print('%s: %s' % (err.__class__.__name__, err)) + raise except OSError as err: if verbose >= 2: print('%s: %s' % (err.__class__.__name__, err)) @@ -290,6 +304,8 @@ def requires(resource, msg=None): if msg is None: msg = "Use of the %r resource not enabled" % resource raise ResourceDenied(msg) + if resource in {"network", "urlfetch"} and not has_socket_support: + raise ResourceDenied("No socket support") if resource == 'gui' and not _is_gui_available(): raise ResourceDenied(_is_gui_available.reason) @@ -367,6 +383,17 @@ def wrapper(*args, **kw): return decorator +def skip_if_buildbot(reason=None): + """Decorator raising SkipTest if running on a buildbot.""" + if not reason: + reason = 'not suitable for buildbots' + try: + isbuildbot = getpass.getuser().lower() == 'buildbot' + except (KeyError, EnvironmentError) as err: + warnings.warn(f'getpass.getuser() failed {err}.', RuntimeWarning) + isbuildbot = False + return unittest.skipIf(isbuildbot, reason) + def check_sanitizer(*, address=False, memory=False, ub=False): """Returns True if Python is compiled with sanitizer support""" if not (address or memory or ub): @@ -462,6 +489,17 @@ def requires_lzma(reason='requires lzma'): lzma = None return unittest.skipUnless(lzma, reason) +def has_no_debug_ranges(): + try: + import _testinternalcapi + except ImportError: + raise unittest.SkipTest("_testinternalcapi required") + config = _testinternalcapi.get_config() + return not bool(config['code_debug_ranges']) + +def requires_debug_ranges(reason='requires co_positions / debug_ranges'): + return unittest.skipIf(has_no_debug_ranges(), reason) + requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string, 'requires legacy Unicode C API') @@ -474,6 +512,46 @@ def requires_lzma(reason='requires lzma'): else: unix_shell = None +# wasm32-emscripten and -wasi are POSIX-like but do not +# have subprocess or fork support. +is_emscripten = sys.platform == "emscripten" +is_wasi = sys.platform == "wasi" + +has_fork_support = hasattr(os, "fork") and not is_emscripten and not is_wasi + +def requires_fork(): + return unittest.skipUnless(has_fork_support, "requires working os.fork()") + +has_subprocess_support = not is_emscripten and not is_wasi + +def requires_subprocess(): + """Used for subprocess, os.spawn calls, fd inheritance""" + return unittest.skipUnless(has_subprocess_support, "requires subprocess support") + +# Emscripten's socket emulation and WASI sockets have limitations. +has_socket_support = not is_emscripten and not is_wasi + +def requires_working_socket(*, module=False): + """Skip tests or modules that require working sockets + + Can be used as a function/class decorator or to skip an entire module. + """ + msg = "requires socket support" + if module: + if not has_socket_support: + raise unittest.SkipTest(msg) + else: + return unittest.skipUnless(has_socket_support, msg) + +# Does strftime() support glibc extension like '%4Y'? +has_strftime_extensions = False +if sys.platform != "win32": + # bpo-47037: Windows debug builds crash with "Debug Assertion Failed" + try: + has_strftime_extensions = time.strftime("%4Y") != "%4Y" + except ValueError: + pass + # Define the URL of a dedicated HTTP server for the network tests. # The URL must use clear-text HTTP: no redirection to encrypted HTTPS. TEST_HTTP_URL = "http://www.pythontest.net" @@ -486,11 +564,6 @@ def requires_lzma(reason='requires lzma'): # PGO task. If this is True, PGO is also True. PGO_EXTENDED = False -# TEST_HOME_DIR refers to the top level directory of the "test" package -# that contains Python's regression test suite -TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) - # TEST_DATA_DIR is used as a target download location for remote resources TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") @@ -712,7 +785,10 @@ def calcvobjsize(fmt): _TPFLAGS_HEAPTYPE = 1<<9 def check_sizeof(test, o, size): - import _testinternalcapi + try: + import _testinternalcapi + except ImportError: + raise unittest.SkipTest("_testinternalcapi required") result = sys.getsizeof(o) # add GC header size if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ @@ -728,29 +804,29 @@ def check_sizeof(test, o, size): @contextlib.contextmanager def run_with_locale(catstr, *locales): + try: + import locale + category = getattr(locale, catstr) + orig_locale = locale.setlocale(category) + except AttributeError: + # if the test author gives us an invalid category string + raise + except: + # cannot retrieve original locale, so do nothing + locale = orig_locale = None + else: + for loc in locales: try: - import locale - category = getattr(locale, catstr) - orig_locale = locale.setlocale(category) - except AttributeError: - # if the test author gives us an invalid category string - raise + locale.setlocale(category, loc) + break except: - # cannot retrieve original locale, so do nothing - locale = orig_locale = None - else: - for loc in locales: - try: - locale.setlocale(category, loc) - break - except: - pass + pass - try: - yield - finally: - if locale and orig_locale: - locale.setlocale(category, orig_locale) + try: + yield + finally: + if locale and orig_locale: + locale.setlocale(category, orig_locale) #======================================================================= # Decorator for running a function in a specific timezone, correctly @@ -1133,17 +1209,18 @@ def match_test_regex(test_id): def run_unittest(*classes): """Run tests from unittest.TestCase-derived classes.""" valid_types = (unittest.TestSuite, unittest.TestCase) + loader = unittest.TestLoader() suite = unittest.TestSuite() for cls in classes: if isinstance(cls, str): if cls in sys.modules: - suite.addTest(unittest.findTestCases(sys.modules[cls])) + suite.addTest(loader.loadTestsFromModule(sys.modules[cls])) else: raise ValueError("str arguments must be keys in sys.modules") elif isinstance(cls, valid_types): suite.addTest(cls) else: - suite.addTest(unittest.makeSuite(cls)) + suite.addTest(loader.loadTestsFromTestCase(cls)) _filter_suite(suite, match_test) _run_suite(suite) @@ -1197,11 +1274,24 @@ def run_doctest(module, verbosity=None, optionflags=0): #======================================================================= # Support for saving and restoring the imported modules. +def flush_std_streams(): + if sys.stdout is not None: + sys.stdout.flush() + if sys.stderr is not None: + sys.stderr.flush() + + def print_warning(msg): - # bpo-39983: Print into sys.__stderr__ to display the warning even - # when sys.stderr is captured temporarily by a test + # bpo-45410: Explicitly flush stdout to keep logs in order + flush_std_streams() + stream = print_warning.orig_stderr for line in msg.splitlines(): - print(f"Warning -- {line}", file=sys.__stderr__, flush=True) + print(f"Warning -- {line}", file=stream) + stream.flush() + +# bpo-39983: Store the original sys.stderr at Python startup to be able to +# log warnings even if sys.stderr is captured temporarily by a test. +print_warning.orig_stderr = sys.stderr # Flag used by saved_test_environment of test.libregrtest.save_env, @@ -1223,6 +1313,8 @@ def reap_children(): # Need os.waitpid(-1, os.WNOHANG): Windows is not supported if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): return + elif not has_subprocess_support: + return # Reap all our dead child processes so we don't leave zombies around. # These hog resources and might be causing some of the buildbots to die. @@ -1364,7 +1456,7 @@ def skip_if_buggy_ucrt_strfptime(test): global _buggy_ucrt if _buggy_ucrt is None: if(sys.platform == 'win32' and - locale.getdefaultlocale()[1] == 'cp65001' and + locale.getencoding() == 'cp65001' and time.localtime().tm_zone == ''): _buggy_ucrt = True else: @@ -1410,8 +1502,8 @@ def _platform_specific(self): self._env = {k.upper(): os.getenv(k) for k in os.environ} self._env["PYTHONHOME"] = os.path.dirname(self.real) - if sysconfig.is_python_build(True): - self._env["PYTHONPATH"] = os.path.dirname(os.__file__) + if sysconfig.is_python_build(): + self._env["PYTHONPATH"] = STDLIB_DIR else: def _platform_specific(self): pass @@ -1685,6 +1777,16 @@ def cleanup(): setattr(object_to_patch, attr_name, new_value) +@contextlib.contextmanager +def patch_list(orig): + """Like unittest.mock.patch.dict, but for lists.""" + try: + saved = orig[:] + yield + finally: + orig[:] = saved + + def run_in_subinterp(code): """ Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc @@ -1981,7 +2083,7 @@ def wait_process(pid, *, exitcode, timeout=None): Raise an AssertionError if the process exit code is not equal to exitcode. - If the process runs longer than timeout seconds (SHORT_TIMEOUT by default), + If the process runs longer than timeout seconds (LONG_TIMEOUT by default), kill the process (if signal.SIGKILL is available) and raise an AssertionError. The timeout feature is not available on Windows. """ @@ -1989,7 +2091,7 @@ def wait_process(pid, *, exitcode, timeout=None): import signal if timeout is None: - timeout = SHORT_TIMEOUT + timeout = LONG_TIMEOUT t0 = time.monotonic() sleep = 0.001 max_sleep = 0.1 @@ -2000,7 +2102,7 @@ def wait_process(pid, *, exitcode, timeout=None): # process is still running dt = time.monotonic() - t0 - if dt > SHORT_TIMEOUT: + if dt > timeout: try: os.kill(pid, signal.SIGKILL) os.waitpid(pid, 0) @@ -2051,16 +2153,6 @@ def skip_if_broken_multiprocessing_synchronize(): raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}") -@contextlib.contextmanager -def infinite_recursion(max_depth=75): - original_depth = sys.getrecursionlimit() - try: - sys.setrecursionlimit(max_depth) - yield - finally: - sys.setrecursionlimit(original_depth) - - def check_disallow_instantiation(testcase, tp, *args, **kwds): """ Check that given type cannot be instantiated using *args and **kwds. @@ -2076,6 +2168,20 @@ def check_disallow_instantiation(testcase, tp, *args, **kwds): msg = f"cannot create '{re.escape(qualname)}' instances" testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds) +@contextlib.contextmanager +def infinite_recursion(max_depth=75): + """Set a lower limit for tests that interact with infinite recursions + (e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some + debug windows builds, due to not enough functions being inlined the + stack size might not handle the default recursion limit (1000). See + bpo-11105 for details.""" + + original_depth = sys.getrecursionlimit() + try: + sys.setrecursionlimit(max_depth) + yield + finally: + sys.setrecursionlimit(original_depth) def ignore_deprecations_from(module: str, *, like: str) -> object: token = object() @@ -2087,7 +2193,6 @@ def ignore_deprecations_from(module: str, *, like: str) -> object: ) return token - def clear_ignored_deprecations(*tokens: object) -> None: if not tokens: raise ValueError("Provide token or tokens returned by ignore_deprecations_from") @@ -2106,3 +2211,31 @@ def clear_ignored_deprecations(*tokens: object) -> None: if warnings.filters != new_filters: warnings.filters[:] = new_filters warnings._filters_mutated() + + +# Skip a test if venv with pip is known to not work. +def requires_venv_with_pip(): + # ensurepip requires zlib to open ZIP archives (.whl binary wheel packages) + try: + import zlib + except ImportError: + return unittest.skipIf(True, "venv: ensurepip requires zlib") + + # bpo-26610: pip/pep425tags.py requires ctypes. + # gh-92820: setuptools/windows_support.py uses ctypes (setuptools 58.1). + try: + import ctypes + except ImportError: + ctypes = None + return unittest.skipUnless(ctypes, 'venv: pip requires ctypes') + + +@contextlib.contextmanager +def adjust_int_max_str_digits(max_digits): + """Temporarily change the integer string conversion length limit.""" + current = sys.get_int_max_str_digits() + try: + sys.set_int_max_str_digits(max_digits) + yield + finally: + sys.set_int_max_str_digits(current) diff --git a/Lib/test/support/import_helper.py b/Lib/test/support/import_helper.py index efa8ffad6a..5201dc84cf 100644 --- a/Lib/test/support/import_helper.py +++ b/Lib/test/support/import_helper.py @@ -1,4 +1,5 @@ import contextlib +import _imp import importlib import importlib.util import os @@ -90,7 +91,24 @@ def _save_and_remove_modules(names): return orig_modules -def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): +@contextlib.contextmanager +def frozen_modules(enabled=True): + """Force frozen modules to be used (or not). + + This only applies to modules that haven't been imported yet. + Also, some essential modules will always be imported frozen. + """ + _imp._override_frozen_modules_for_tests(1 if enabled else -1) + try: + yield + finally: + _imp._override_frozen_modules_for_tests(0) + + +def import_fresh_module(name, fresh=(), blocked=(), *, + deprecated=False, + usefrozen=False, + ): """Import and return a module, deliberately bypassing sys.modules. This function imports and returns a fresh copy of the named Python module @@ -115,6 +133,9 @@ def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): This function will raise ImportError if the named module cannot be imported. + + If "usefrozen" is False (the default) then the frozen importer is + disabled (except for essential modules like importlib._bootstrap). """ # NOTE: test_heapq, test_json and test_warnings include extra sanity checks # to make sure that this utility function is working as expected @@ -129,13 +150,14 @@ def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): sys.modules[modname] = None try: - # Return None when one of the "fresh" modules can not be imported. - try: - for modname in fresh: - __import__(modname) - except ImportError: - return None - return importlib.import_module(name) + with frozen_modules(usefrozen): + # Return None when one of the "fresh" modules can not be imported. + try: + for modname in fresh: + __import__(modname) + except ImportError: + return None + return importlib.import_module(name) finally: _save_and_remove_modules(names) sys.modules.update(orig_modules) @@ -151,9 +173,12 @@ class CleanImport(object): with CleanImport("foo"): importlib.import_module("foo") # new reference + + If "usefrozen" is False (the default) then the frozen importer is + disabled (except for essential modules like importlib._bootstrap). """ - def __init__(self, *module_names): + def __init__(self, *module_names, usefrozen=False): self.original_modules = sys.modules.copy() for module_name in module_names: if module_name in sys.modules: @@ -165,12 +190,15 @@ def __init__(self, *module_names): if module.__name__ != module_name: del sys.modules[module.__name__] del sys.modules[module_name] + self._frozen_modules = frozen_modules(usefrozen) def __enter__(self): + self._frozen_modules.__enter__() return self def __exit__(self, *ignore_exc): sys.modules.update(self.original_modules) + self._frozen_modules.__exit__(*ignore_exc) class DirsOnSysPath(object): diff --git a/Lib/test/support/os_helper.py b/Lib/test/support/os_helper.py index 82a6de789c..f599cc7521 100644 --- a/Lib/test/support/os_helper.py +++ b/Lib/test/support/os_helper.py @@ -49,8 +49,8 @@ 'encoding (%s). Unicode filename tests may not be effective' % (TESTFN_UNENCODABLE, sys.getfilesystemencoding())) TESTFN_UNENCODABLE = None -# Mac OS X denies unencodable filenames (invalid utf-8) -elif sys.platform != 'darwin': +# macOS and Emscripten deny unencodable filenames (invalid utf-8) +elif sys.platform not in {'darwin', 'emscripten', 'wasi'}: try: # ascii and utf-8 cannot encode the byte 0xff b'\xff'.decode(sys.getfilesystemencoding()) @@ -171,9 +171,13 @@ def can_symlink(): global _can_symlink if _can_symlink is not None: return _can_symlink - symlink_path = TESTFN + "can_symlink" + # WASI / wasmtime prevents symlinks with absolute paths, see man + # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute + # paths. Skip symlink tests on WASI for now. + src = os.path.abspath(TESTFN) + symlink_path = src + "can_symlink" try: - os.symlink(TESTFN, symlink_path) + os.symlink(src, symlink_path) can = True except (OSError, NotImplementedError, AttributeError): can = False @@ -233,6 +237,84 @@ def skip_unless_xattr(test): return test if ok else unittest.skip(msg)(test) +_can_chmod = None + +def can_chmod(): + global _can_chmod + if _can_chmod is not None: + return _can_chmod + if not hasattr(os, "chown"): + _can_chmod = False + return _can_chmod + try: + with open(TESTFN, "wb") as f: + try: + os.chmod(TESTFN, 0o777) + mode1 = os.stat(TESTFN).st_mode + os.chmod(TESTFN, 0o666) + mode2 = os.stat(TESTFN).st_mode + except OSError as e: + can = False + else: + can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2) + finally: + unlink(TESTFN) + _can_chmod = can + return can + + +def skip_unless_working_chmod(test): + """Skip tests that require working os.chmod() + + WASI SDK 15.0 cannot change file mode bits. + """ + ok = can_chmod() + msg = "requires working os.chmod()" + return test if ok else unittest.skip(msg)(test) + + +# Check whether the current effective user has the capability to override +# DAC (discretionary access control). Typically user root is able to +# bypass file read, write, and execute permission checks. The capability +# is independent of the effective user. See capabilities(7). +_can_dac_override = None + +def can_dac_override(): + global _can_dac_override + + if not can_chmod(): + _can_dac_override = False + if _can_dac_override is not None: + return _can_dac_override + + try: + with open(TESTFN, "wb") as f: + os.chmod(TESTFN, 0o400) + try: + with open(TESTFN, "wb"): + pass + except OSError: + _can_dac_override = False + else: + _can_dac_override = True + finally: + unlink(TESTFN) + + return _can_dac_override + + +def skip_if_dac_override(test): + ok = not can_dac_override() + msg = "incompatible with CAP_DAC_OVERRIDE" + return test if ok else unittest.skip(msg)(test) + + +def skip_unless_dac_override(test): + ok = can_dac_override() + msg = "requires CAP_DAC_OVERRIDE" + return test if ok else unittest.skip(msg)(test) + + def unlink(filename): try: _unlink(filename) @@ -459,7 +541,10 @@ def create_empty_file(filename): def open_dir_fd(path): """Open a file descriptor to a directory.""" assert os.path.isdir(path) - dir_fd = os.open(path, os.O_RDONLY) + flags = os.O_RDONLY + if hasattr(os, "O_DIRECTORY"): + flags |= os.O_DIRECTORY + dir_fd = os.open(path, flags) try: yield dir_fd finally: @@ -502,7 +587,7 @@ def __fspath__(self): def fd_count(): """Count the number of open file descriptors. """ - if sys.platform.startswith(('linux', 'freebsd')): + if sys.platform.startswith(('linux', 'freebsd', 'emscripten')): try: names = os.listdir("/proc/self/fd") # Subtract one because listdir() internally opens a file @@ -568,6 +653,11 @@ def temp_umask(umask): yield finally: os.umask(oldmask) +else: + @contextlib.contextmanager + def temp_umask(umask): + """no-op on platforms without umask()""" + yield class EnvironmentVarGuard(collections.abc.MutableMapping): @@ -610,6 +700,10 @@ def set(self, envvar, value): def unset(self, envvar): del self[envvar] + def copy(self): + # We do what os.environ.copy() does. + return dict(self) + def __enter__(self): return self diff --git a/Lib/test/support/script_helper.py b/Lib/test/support/script_helper.py index 6d699c8486..c2b43f4060 100644 --- a/Lib/test/support/script_helper.py +++ b/Lib/test/support/script_helper.py @@ -42,6 +42,10 @@ def interpreter_requires_environment(): if 'PYTHONHOME' in os.environ: __cached_interp_requires_environment = True return True + # cannot run subprocess, assume we don't need it + if not support.has_subprocess_support: + __cached_interp_requires_environment = False + return False # Try running an interpreter with -E to see if it works or not. try: @@ -87,6 +91,7 @@ def fail(self, cmd_line): # Executing the interpreter in a subprocess +@support.requires_subprocess() def run_python_until_end(*args, **env_vars): env_required = interpreter_requires_environment() cwd = env_vars.pop('__cwd', None) @@ -139,6 +144,7 @@ def run_python_until_end(*args, **env_vars): return _PythonRunResult(rc, out, err), cmd_line +@support.requires_subprocess() def _assert_python(expected_success, /, *args, **env_vars): res, cmd_line = run_python_until_end(*args, **env_vars) if (res.rc and expected_success) or (not res.rc and not expected_success): @@ -171,6 +177,7 @@ def assert_python_failure(*args, **env_vars): return _assert_python(False, *args, **env_vars) +@support.requires_subprocess() def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): """Run a Python subprocess with the given arguments. @@ -273,6 +280,7 @@ def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, return zip_name, os.path.join(zip_name, script_name_in_zip) +@support.requires_subprocess() def run_test_script(script): # use -u to try to get the full output if the test hangs or crash if support.verbose: diff --git a/Lib/test/support/socket_helper.py b/Lib/test/support/socket_helper.py index b51677383e..42b2a93398 100644 --- a/Lib/test/support/socket_helper.py +++ b/Lib/test/support/socket_helper.py @@ -5,12 +5,15 @@ import sys from .. import support - +from . import warnings_helper HOST = "localhost" HOSTv4 = "127.0.0.1" HOSTv6 = "::1" +# WASI SDK 15.0 does not provide gethostname, stub raises OSError ENOTSUP. +has_gethostname = not support.is_wasi + def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): """Returns an unused port that should be suitable for binding. This is @@ -190,7 +193,7 @@ def get_socket_conn_refused_errs(): def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()): """Return a context manager that raises ResourceDenied when various issues with the internet connection manifest themselves as exceptions.""" - import nntplib + nntplib = warnings_helper.import_deprecated("nntplib") import urllib.error if timeout is _NOT_SET: timeout = support.INTERNET_TIMEOUT @@ -256,7 +259,7 @@ def filter_error(err): err = a[0] # The error can also be wrapped as args[1]: # except socket.error as msg: - # raise OSError('socket error', msg).with_traceback(sys.exc_info()[2]) + # raise OSError('socket error', msg) from msg elif len(a) >= 2 and isinstance(a[1], OSError): err = a[1] else: diff --git a/Lib/test/support/testresult.py b/Lib/test/support/testresult.py index 6f2edda0f5..2cd1366cd8 100644 --- a/Lib/test/support/testresult.py +++ b/Lib/test/support/testresult.py @@ -173,7 +173,7 @@ def test_error(self): raise RuntimeError('error message') suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(TestTests)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestTests)) stream = io.StringIO() runner_cls = get_test_runner_class(sum(a == '-v' for a in sys.argv)) runner = runner_cls(sys.stdout) diff --git a/Lib/test/support/threading_helper.py b/Lib/test/support/threading_helper.py index 92a64e8354..26cbc6f4d2 100644 --- a/Lib/test/support/threading_helper.py +++ b/Lib/test/support/threading_helper.py @@ -4,6 +4,7 @@ import sys import threading import time +import unittest from test import support @@ -207,3 +208,37 @@ def __exit__(self, *exc_info): del self.exc_value del self.exc_traceback del self.thread + + +def _can_start_thread() -> bool: + """Detect whether Python can start new threads. + + Some WebAssembly platforms do not provide a working pthread + implementation. Thread support is stubbed and any attempt + to create a new thread fails. + + - wasm32-wasi does not have threading. + - wasm32-emscripten can be compiled with or without pthread + support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__). + """ + if sys.platform == "emscripten": + return sys._emscripten_info.pthreads + elif sys.platform == "wasi": + return False + else: + # assume all other platforms have working thread support. + return True + +can_start_thread = _can_start_thread() + +def requires_working_threading(*, module=False): + """Skip tests or modules that require working threading. + + Can be used as a function/class decorator or to skip an entire module. + """ + msg = "requires threading support" + if module: + if not can_start_thread: + raise unittest.SkipTest(msg) + else: + return unittest.skipUnless(can_start_thread, msg) diff --git a/Lib/test/support/warnings_helper.py b/Lib/test/support/warnings_helper.py index a024fbe5be..28e96f88b2 100644 --- a/Lib/test/support/warnings_helper.py +++ b/Lib/test/support/warnings_helper.py @@ -1,10 +1,18 @@ import contextlib import functools +import importlib import re import sys import warnings +def import_deprecated(name): + """Import *name* while suppressing DeprecationWarning.""" + with warnings.catch_warnings(): + warnings.simplefilter('ignore', category=DeprecationWarning) + return importlib.import_module(name) + + def check_syntax_warning(testcase, statement, errtext='', *, lineno=1, offset=None): # Test also that a warning is emitted only once. diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index b7a1db0eeb..d3ea2af281 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -123,15 +123,18 @@ def test_forget(self): os_helper.unlink(mod_filename) os_helper.rmtree('__pycache__') + @support.requires_working_socket() def test_HOST(self): s = socket.create_server((socket_helper.HOST, 0)) s.close() + @support.requires_working_socket() def test_find_unused_port(self): port = socket_helper.find_unused_port() s = socket.create_server((socket_helper.HOST, port)) s.close() + @support.requires_working_socket() def test_bind_port(self): s = socket.socket() socket_helper.bind_port(s) @@ -198,7 +201,7 @@ def test_temp_dir__existing_dir__quiet_true(self): f'temporary directory {path!r}: '), warn) - @unittest.skipUnless(hasattr(os, "fork"), "test requires os.fork") + @support.requires_fork() def test_temp_dir__forked_child(self): """Test that a forked child process does not remove the directory.""" # See bpo-30028 for details. @@ -429,9 +432,14 @@ def test_check__all__(self): extra=extra, not_exported=not_exported) - extra = {'TextTestResult', 'installHandler'} + extra = { + 'TextTestResult', + 'findTestCases', + 'getTestCaseNames', + 'installHandler', + 'makeSuite', + } not_exported = {'load_tests', "TestProgram", "BaseTestSuite"} - support.check__all__(self, unittest, ("unittest.result", "unittest.case", @@ -447,6 +455,7 @@ def test_check__all__(self): @unittest.expectedFailure @unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'), 'need os.waitpid() and os.WNOHANG') + @support.requires_fork() def test_reap_children(self): # Make sure that there is no other pending child process support.reap_children() @@ -469,12 +478,8 @@ def test_reap_children(self): if time.monotonic() > deadline: self.fail("timeout") - old_stderr = sys.__stderr__ - try: - sys.__stderr__ = stderr + with support.swap_attr(support.print_warning, 'orig_stderr', stderr): support.reap_children() - finally: - sys.__stderr__ = old_stderr # Use environment_altered to check if reap_children() found # the child process @@ -494,6 +499,7 @@ def test_reap_children(self): # pending child process support.reap_children() + @support.requires_subprocess() def check_options(self, args, func, expected=None): code = f'from test.support import {func}; print(repr({func}()))' cmd = [sys.executable, *args, '-c', code] @@ -521,6 +527,7 @@ def test_args_from_interpreter_flags(self): ['-E'], ['-v'], ['-b'], + ['-P'], ['-q'], ['-I'], # same option multiple times @@ -540,7 +547,8 @@ def test_args_from_interpreter_flags(self): with self.subTest(opts=opts): self.check_options(opts, 'args_from_interpreter_flags') - self.check_options(['-I', '-E', '-s'], 'args_from_interpreter_flags', + self.check_options(['-I', '-E', '-s', '-P'], + 'args_from_interpreter_flags', ['-I']) def test_optim_args_from_interpreter_flags(self): @@ -661,10 +669,14 @@ def id(self): self.assertTrue(support.match_test(test_chdir)) @unittest.skipIf(sys.platform.startswith("win"), "TODO: RUSTPYTHON; os.dup on windows") + @unittest.skipIf(support.is_emscripten, "Unstable in Emscripten") + @unittest.skipIf(support.is_wasi, "Unavailable on WASI") def test_fd_count(self): # We cannot test the absolute value of fd_count(): on old Linux # kernel or glibc versions, os.urandom() keeps a FD open on # /dev/urandom device and Python has 4 FD opens instead of 3. + # Test is unstable on Emscripten. The platform starts and stops + # background threads that use pipes and epoll fds. start = os_helper.fd_count() fd = os.open(__file__, os.O_RDONLY) try: @@ -675,14 +687,8 @@ def test_fd_count(self): def check_print_warning(self, msg, expected): stderr = io.StringIO() - - old_stderr = sys.__stderr__ - try: - sys.__stderr__ = stderr + with support.swap_attr(support.print_warning, 'orig_stderr', stderr): support.print_warning(msg) - finally: - sys.__stderr__ = old_stderr - self.assertEqual(stderr.getvalue(), expected) def test_print_warning(self): @@ -691,6 +697,12 @@ def test_print_warning(self): self.check_print_warning("a\nb", 'Warning -- a\nWarning -- b\n') + def test_has_strftime_extensions(self): + if support.is_emscripten or sys.platform == "win32": + self.assertFalse(support.has_strftime_extensions) + else: + self.assertTrue(support.has_strftime_extensions) + # XXX -follows a list of untested API # make_legacy_pyc # is_resource_enabled From cfa9de40479bcf3a7a07a6dec2522ae1fbf7b3f5 Mon Sep 17 00:00:00 2001 From: Jeong YunWon Date: Wed, 22 Feb 2023 00:26:34 +0900 Subject: [PATCH 2/5] Mark newly failing tests --- Lib/test/test_support.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index d3ea2af281..18d4155a00 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -515,6 +515,8 @@ def check_options(self, args, func, expected=None): self.assertEqual(proc.stdout.rstrip(), repr(expected)) self.assertEqual(proc.returncode, 0) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_args_from_interpreter_flags(self): # Test test.support.args_from_interpreter_flags() for opts in ( @@ -703,6 +705,11 @@ def test_has_strftime_extensions(self): else: self.assertTrue(support.has_strftime_extensions) + # TODO: RUSTPYTHON + if not sys.platform.startswith("win"): + # TODO: RUSTPYTHON + test_has_strftime_extensions = unittest.expectedFailure(test_has_strftime_extensions) + # XXX -follows a list of untested API # make_legacy_pyc # is_resource_enabled From 7de056437f1d611219abfa5316b19e8161dc1261 Mon Sep 17 00:00:00 2001 From: CPython Devleopers <> Date: Fri, 24 Feb 2023 01:15:34 +0900 Subject: [PATCH 3/5] Update locale from CPython 3.11.2 --- Lib/locale.py | 136 ++++++++++++++++++++++++++-------------- Lib/test/test_locale.py | 64 ++++++++++++++++--- 2 files changed, 145 insertions(+), 55 deletions(-) diff --git a/Lib/locale.py b/Lib/locale.py index f3d3973d03..7a7694e1bf 100644 --- a/Lib/locale.py +++ b/Lib/locale.py @@ -28,7 +28,7 @@ "setlocale", "resetlocale", "localeconv", "strcoll", "strxfrm", "str", "atof", "atoi", "format", "format_string", "currency", "normalize", "LC_CTYPE", "LC_COLLATE", "LC_TIME", "LC_MONETARY", - "LC_NUMERIC", "LC_ALL", "CHAR_MAX"] + "LC_NUMERIC", "LC_ALL", "CHAR_MAX", "getencoding"] def _strcoll(a,b): """ strcoll(string,string) -> int. @@ -185,8 +185,14 @@ def _format(percent, value, grouping=False, monetary=False, *additional): formatted = percent % ((value,) + additional) else: formatted = percent % value + if percent[-1] in 'eEfFgGdiu': + formatted = _localize(formatted, grouping, monetary) + return formatted + +# Transform formatted as locale number according to the locale settings +def _localize(formatted, grouping=False, monetary=False): # floats and decimal ints need special action! - if percent[-1] in 'eEfFgG': + if '.' in formatted: seps = 0 parts = formatted.split('.') if grouping: @@ -196,7 +202,7 @@ def _format(percent, value, grouping=False, monetary=False, *additional): formatted = decimal_point.join(parts) if seps: formatted = _strip_padding(formatted, seps) - elif percent[-1] in 'diu': + else: seps = 0 if grouping: formatted, seps = _group(formatted, monetary=monetary) @@ -267,7 +273,7 @@ def currency(val, symbol=True, grouping=False, international=False): raise ValueError("Currency formatting is not possible using " "the 'C' locale.") - s = _format('%%.%if' % digits, abs(val), grouping, monetary=True) + s = _localize(f'{abs(val):.{digits}f}', grouping, monetary=True) # '<' and '>' are markers if the sign must be inserted between symbol and value s = '<' + s + '>' @@ -279,6 +285,8 @@ def currency(val, symbol=True, grouping=False, international=False): if precedes: s = smb + (separated and ' ' or '') + s else: + if international and smb[-1] == ' ': + smb = smb[:-1] s = s + (separated and ' ' or '') + smb sign_pos = conv[val<0 and 'n_sign_posn' or 'p_sign_posn'] @@ -321,6 +329,10 @@ def delocalize(string): string = string.replace(dd, '.') return string +def localize(string, grouping=False, monetary=False): + """Parses a string as locale number according to the locale settings.""" + return _localize(string, grouping, monetary) + def atof(string, func=float): "Parses a string as a float according to the locale settings." return func(delocalize(string)) @@ -492,6 +504,10 @@ def _parse_localename(localename): return tuple(code.split('.')[:2]) elif code == 'C': return None, None + elif code == 'UTF-8': + # On macOS "LC_CTYPE=UTF-8" is a valid locale setting + # for getting UTF-8 handling for text. + return None, 'UTF-8' raise ValueError('unknown locale: %s' % localename) def _build_localename(localetuple): @@ -539,6 +555,12 @@ def getdefaultlocale(envvars=('LC_ALL', 'LC_CTYPE', 'LANG', 'LANGUAGE')): """ + import warnings + warnings.warn( + "Use setlocale(), getencoding() and getlocale() instead", + DeprecationWarning, stacklevel=2 + ) + try: # check if it's supported by the _locale module import _locale @@ -611,55 +633,72 @@ def resetlocale(category=LC_ALL): getdefaultlocale(). category defaults to LC_ALL. """ - _setlocale(category, _build_localename(getdefaultlocale())) + import warnings + warnings.warn( + 'Use locale.setlocale(locale.LC_ALL, "") instead', + DeprecationWarning, stacklevel=2 + ) + + with warnings.catch_warnings(): + warnings.simplefilter('ignore', category=DeprecationWarning) + loc = getdefaultlocale() + + _setlocale(category, _build_localename(loc)) + + +try: + from _locale import getencoding +except ImportError: + def getencoding(): + if hasattr(sys, 'getandroidapilevel'): + # On Android langinfo.h and CODESET are missing, and UTF-8 is + # always used in mbstowcs() and wcstombs(). + return 'utf-8' + encoding = getdefaultlocale()[1] + if encoding is None: + # LANG not set, default to UTF-8 + encoding = 'utf-8' + return encoding -if sys.platform.startswith("win"): - # On Win32, this will return the ANSI code page - def getpreferredencoding(do_setlocale = True): +try: + CODESET +except NameError: + def getpreferredencoding(do_setlocale=True): """Return the charset that the user is likely using.""" + if sys.flags.warn_default_encoding: + import warnings + warnings.warn( + "UTF-8 Mode affects locale.getpreferredencoding(). Consider locale.getencoding() instead.", + EncodingWarning, 2) if sys.flags.utf8_mode: - return 'UTF-8' - import _bootlocale - return _bootlocale.getpreferredencoding(False) + return 'utf-8' + return getencoding() else: # On Unix, if CODESET is available, use that. - try: - CODESET - except NameError: - if hasattr(sys, 'getandroidapilevel'): - # On Android langinfo.h and CODESET are missing, and UTF-8 is - # always used in mbstowcs() and wcstombs(). - def getpreferredencoding(do_setlocale = True): - return 'UTF-8' - else: - # Fall back to parsing environment variables :-( - def getpreferredencoding(do_setlocale = True): - """Return the charset that the user is likely using, - by looking at environment variables.""" - if sys.flags.utf8_mode: - return 'UTF-8' - res = getdefaultlocale()[1] - if res is None: - # LANG not set, default conservatively to ASCII - res = 'ascii' - return res - else: - def getpreferredencoding(do_setlocale = True): - """Return the charset that the user is likely using, - according to the system configuration.""" - if sys.flags.utf8_mode: - return 'UTF-8' - import _bootlocale - if do_setlocale: - oldloc = setlocale(LC_CTYPE) - try: - setlocale(LC_CTYPE, "") - except Error: - pass - result = _bootlocale.getpreferredencoding(False) - if do_setlocale: - setlocale(LC_CTYPE, oldloc) - return result + def getpreferredencoding(do_setlocale=True): + """Return the charset that the user is likely using, + according to the system configuration.""" + + if sys.flags.warn_default_encoding: + import warnings + warnings.warn( + "UTF-8 Mode affects locale.getpreferredencoding(). Consider locale.getencoding() instead.", + EncodingWarning, 2) + if sys.flags.utf8_mode: + return 'utf-8' + + if not do_setlocale: + return getencoding() + + old_loc = setlocale(LC_CTYPE) + try: + try: + setlocale(LC_CTYPE, "") + except Error: + pass + return getencoding() + finally: + setlocale(LC_CTYPE, old_loc) ### Database @@ -734,6 +773,7 @@ def getpreferredencoding(do_setlocale = True): for k, v in sorted(locale_encoding_alias.items()): k = k.replace('_', '') locale_encoding_alias.setdefault(k, v) +del k, v # # The locale_alias table maps lowercase alias names to C locale names diff --git a/Lib/test/test_locale.py b/Lib/test/test_locale.py index 25d883f43c..196dc6e3a1 100644 --- a/Lib/test/test_locale.py +++ b/Lib/test/test_locale.py @@ -1,4 +1,5 @@ -from test.support import verbose, is_android +from decimal import Decimal +from test.support import verbose, is_android, is_emscripten, is_wasi from test.support.warnings_helper import check_warnings import unittest import locale @@ -335,8 +336,7 @@ def test_currency(self): euro = '\u20ac' self._test_currency(50000, "50000,00 " + euro) self._test_currency(50000, "50 000,00 " + euro, grouping=True) - # XXX is the trailing space a bug? - self._test_currency(50000, "50 000,00 EUR ", + self._test_currency(50000, "50 000,00 EUR", grouping=True, international=True) @@ -367,7 +367,7 @@ class TestEnUSCollation(BaseLocalizedTest, TestCollation): locale_type = locale.LC_ALL def setUp(self): - enc = codecs.lookup(locale.getpreferredencoding(False) or 'ascii').name + enc = codecs.lookup(locale.getencoding() or 'ascii').name if enc not in ('utf-8', 'iso8859-1', 'cp1252'): raise unittest.SkipTest('encoding not suitable') if enc != 'iso8859-1' and (sys.platform == 'darwin' or is_android or @@ -377,11 +377,19 @@ def setUp(self): @unittest.skipIf(sys.platform.startswith('aix'), 'bpo-29972: broken test on AIX') + @unittest.skipIf( + is_emscripten or is_wasi, + "musl libc issue on Emscripten/WASI, bpo-46390" + ) def test_strcoll_with_diacritic(self): self.assertLess(locale.strcoll('à', 'b'), 0) @unittest.skipIf(sys.platform.startswith('aix'), 'bpo-29972: broken test on AIX') + @unittest.skipIf( + is_emscripten or is_wasi, + "musl libc issue on Emscripten/WASI, bpo-46390" + ) def test_strxfrm_with_diacritic(self): self.assertLess(locale.strxfrm('à'), locale.strxfrm('b')) @@ -502,7 +510,7 @@ class TestMiscellaneous(unittest.TestCase): @unittest.expectedFailure def test_defaults_UTF8(self): # Issue #18378: on (at least) macOS setting LC_CTYPE to "UTF-8" is - # valid. Futhermore LC_CTYPE=UTF is used by the UTF-8 locale coercing + # valid. Furthermore LC_CTYPE=UTF is used by the UTF-8 locale coercing # during interpreter startup (on macOS). import _locale import os @@ -524,7 +532,8 @@ def test_defaults_UTF8(self): os.environ['LC_CTYPE'] = 'UTF-8' - self.assertEqual(locale.getdefaultlocale(), (None, 'UTF-8')) + with check_warnings(('', DeprecationWarning)): + self.assertEqual(locale.getdefaultlocale(), (None, 'UTF-8')) finally: for k in orig_env: @@ -536,6 +545,14 @@ def test_defaults_UTF8(self): if orig_getlocale is not None: _locale._getdefaultlocale = orig_getlocale + def test_getencoding(self): + # Invoke getencoding to make sure it does not cause exceptions. + enc = locale.getencoding() + self.assertIsInstance(enc, str) + self.assertNotEqual(enc, "") + # make sure it is valid + codecs.lookup(enc) + def test_getpreferredencoding(self): # Invoke getpreferredencoding to make sure it does not cause exceptions. enc = locale.getpreferredencoding() @@ -573,7 +590,13 @@ def test_getsetlocale_issue1813(self): loc = locale.getlocale(locale.LC_CTYPE) if verbose: print('testing with %a' % (loc,), end=' ', flush=True) - locale.setlocale(locale.LC_CTYPE, loc) + try: + locale.setlocale(locale.LC_CTYPE, loc) + except locale.Error as exc: + # bpo-37945: setlocale(LC_CTYPE) fails with getlocale(LC_CTYPE) + # and the tr_TR locale on Windows. getlocale() builds a locale + # which is not recognize by setlocale(). + self.skipTest(f"setlocale(LC_CTYPE, {loc!r}) failed: {exc!r}") self.assertEqual(loc, locale.getlocale(locale.LC_CTYPE)) def test_invalid_locale_format_in_localetuple(self): @@ -639,5 +662,32 @@ def test_atoi(self): self._test_atoi('50 000', 50000) +class BaseLocalizeTest(BaseLocalizedTest): + + def _test_localize(self, value, out, grouping=False): + self.assertEqual(locale.localize(value, grouping=grouping), out) + + +class TestEnUSLocalize(EnUSCookedTest, BaseLocalizeTest): + + def test_localize(self): + self._test_localize('50000.00', '50000.00') + self._test_localize( + '{0:.16f}'.format(Decimal('1.15')), '1.1500000000000000') + + +class TestCLocalize(CCookedTest, BaseLocalizeTest): + + def test_localize(self): + self._test_localize('50000.00', '50000.00') + + +class TestfrFRLocalize(FrFRCookedTest, BaseLocalizeTest): + + def test_localize(self): + self._test_localize('50000.00', '50000,00') + self._test_localize('50000.00', '50 000,00', grouping=True) + + if __name__ == '__main__': unittest.main() From a7c3740dee576594da2ef839e93d6a4f6cbb6ab9 Mon Sep 17 00:00:00 2001 From: CPython Developers <> Date: Fri, 24 Feb 2023 02:06:13 +0900 Subject: [PATCH 4/5] Update test_utf8_mode from 3.11.2 --- Lib/test/test_utf8_mode.py | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_utf8_mode.py b/Lib/test/test_utf8_mode.py index 429206dd5f..b852d54e71 100644 --- a/Lib/test/test_utf8_mode.py +++ b/Lib/test/test_utf8_mode.py @@ -3,11 +3,13 @@ """ import locale +import subprocess import sys import textwrap import unittest from test import support from test.support.script_helper import assert_python_ok, assert_python_failure +from test.support import os_helper MS_WINDOWS = (sys.platform == 'win32') @@ -171,7 +173,7 @@ def test_io(self): filename = __file__ out = self.get_output('-c', code, filename, PYTHONUTF8='1') - self.assertEqual(out, 'UTF-8/strict') + self.assertEqual(out.lower(), 'utf-8/strict') def _check_io_encoding(self, module, encoding=None, errors=None): filename = __file__ @@ -193,10 +195,10 @@ def _check_io_encoding(self, module, encoding=None, errors=None): PYTHONUTF8='1') if not encoding: - encoding = 'UTF-8' + encoding = 'utf-8' if not errors: errors = 'strict' - self.assertEqual(out, f'{encoding}/{errors}') + self.assertEqual(out.lower(), f'{encoding}/{errors}') def check_io_encoding(self, module): self._check_io_encoding(module, encoding="latin1") @@ -215,12 +217,12 @@ def test_pyio_encoding(self): def test_locale_getpreferredencoding(self): code = 'import locale; print(locale.getpreferredencoding(False), locale.getpreferredencoding(True))' out = self.get_output('-X', 'utf8', '-c', code) - self.assertEqual(out, 'UTF-8 UTF-8') + self.assertEqual(out, 'utf-8 utf-8') for loc in POSIX_LOCALES: with self.subTest(LC_ALL=loc): out = self.get_output('-X', 'utf8', '-c', code, LC_ALL=loc) - self.assertEqual(out, 'UTF-8 UTF-8') + self.assertEqual(out, 'utf-8 utf-8') @unittest.skipIf(MS_WINDOWS, 'test specific to Unix') def test_cmd_line(self): @@ -268,6 +270,32 @@ def test_optim_level(self): out = self.get_output('-X', 'utf8', '-E', '-c', code) self.assertEqual(out, '1') + @unittest.skipIf(MS_WINDOWS, + "os.device_encoding() doesn't implement " + "the UTF-8 Mode on Windows") + @support.requires_subprocess() + def test_device_encoding(self): + # Use stdout as TTY + if not sys.stdout.isatty(): + self.skipTest("sys.stdout is not a TTY") + + filename = 'out.txt' + self.addCleanup(os_helper.unlink, filename) + + code = (f'import os, sys; fd = sys.stdout.fileno(); ' + f'out = open({filename!r}, "w", encoding="utf-8"); ' + f'print(os.isatty(fd), os.device_encoding(fd), file=out); ' + f'out.close()') + cmd = [sys.executable, '-X', 'utf8', '-c', code] + # The stdout TTY is inherited to the child process + proc = subprocess.run(cmd, text=True) + self.assertEqual(proc.returncode, 0, proc) + + # In UTF-8 Mode, device_encoding(fd) returns "UTF-8" if fd is a TTY + with open(filename, encoding="utf8") as fp: + out = fp.read().rstrip() + self.assertEqual(out, 'True utf-8') + if __name__ == "__main__": unittest.main() From 25398fe6221569a3a42952a18f0ed058f72cf200 Mon Sep 17 00:00:00 2001 From: Jeong YunWon Date: Fri, 24 Feb 2023 02:07:44 +0900 Subject: [PATCH 5/5] adjust success/filure for test_utf8_mode --- Lib/test/test_utf8_mode.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_utf8_mode.py b/Lib/test/test_utf8_mode.py index b852d54e71..014f133224 100644 --- a/Lib/test/test_utf8_mode.py +++ b/Lib/test/test_utf8_mode.py @@ -161,8 +161,6 @@ def test_stdio(self): 'stdout: utf-8/namereplace', 'stderr: utf-8/backslashreplace']) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_io(self): code = textwrap.dedent(''' import sys @@ -206,8 +204,6 @@ def check_io_encoding(self, module): self._check_io_encoding(module, encoding="latin1", errors="namereplace") - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_io_encoding(self): self.check_io_encoding('io') @@ -270,6 +266,8 @@ def test_optim_level(self): out = self.get_output('-X', 'utf8', '-E', '-c', code) self.assertEqual(out, '1') + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipIf(MS_WINDOWS, "os.device_encoding() doesn't implement " "the UTF-8 Mode on Windows")