Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
122 changes: 111 additions & 11 deletions 122 Lib/test/test_pty.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,22 @@
import io # readline
import unittest

import struct
import tty
import fcntl
import platform
import warnings

TEST_STRING_1 = b"I wish to buy a fish license.\n"
TEST_STRING_2 = b"For my pet fish, Eric.\n"

try:
_TIOCGWINSZ = tty.TIOCGWINSZ
_TIOCSWINSZ = tty.TIOCSWINSZ
_HAVE_WINSZ = True
except AttributeError:
_HAVE_WINSZ = False

if verbose:
def debug(msg):
print(msg)
Expand Down Expand Up @@ -60,6 +73,27 @@ def _readline(fd):
reader = io.FileIO(fd, mode='rb', closefd=False)
return reader.readline()

def expectedFailureIfStdinIsTTY(fun):
# avoid isatty() for now
try:
tty.tcgetattr(pty.STDIN_FILENO)
return unittest.expectedFailure(fun)
except tty.error:
pass
return fun

def expectedFailureOnBSD(fun):
PLATFORM = platform.system()
if PLATFORM.endswith("BSD") or PLATFORM == "Darwin":
return unittest.expectedFailure(fun)
Comment thread
ethanfurman marked this conversation as resolved.
Outdated
return fun

def _get_term_winsz(fd):
s = struct.pack("HHHH", 0, 0, 0, 0)
return fcntl.ioctl(fd, _TIOCGWINSZ, s)

def _set_term_winsz(fd, winsz):
fcntl.ioctl(fd, _TIOCSWINSZ, winsz)


# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
Expand All @@ -78,6 +112,20 @@ def setUp(self):
self.addCleanup(signal.alarm, 0)
signal.alarm(10)

# Save original stdin window size
self.stdin_rows = None
self.stdin_cols = None
if _HAVE_WINSZ:
try:
stdin_dim = os.get_terminal_size(pty.STDIN_FILENO)
self.stdin_rows = stdin_dim.lines
self.stdin_cols = stdin_dim.columns
old_stdin_winsz = struct.pack("HHHH", self.stdin_rows,
self.stdin_cols, 0, 0)
self.addCleanup(_set_term_winsz, pty.STDIN_FILENO, old_stdin_winsz)
except OSError:
pass

def handle_sig(self, sig, frame):
self.fail("isatty hung")

Expand All @@ -86,26 +134,65 @@ def handle_sighup(signum, frame):
# bpo-38547: if the process is the session leader, os.close(master_fd)
# of "master_fd, slave_name = pty.master_open()" raises SIGHUP
# signal: just ignore the signal.
#
# NOTE: the above comment is from an older version of the test;
# master_open() is not being used anymore.
pass

def test_basic(self):
@expectedFailureIfStdinIsTTY
def test_openpty(self):
try:
debug("Calling master_open()")
master_fd, slave_name = pty.master_open()
debug("Got master_fd '%d', slave_name '%s'" %
(master_fd, slave_name))
debug("Calling slave_open(%r)" % (slave_name,))
slave_fd = pty.slave_open(slave_name)
debug("Got slave_fd '%d'" % slave_fd)
mode = tty.tcgetattr(pty.STDIN_FILENO)
except tty.error:
# not a tty or bad/closed fd
debug("tty.tcgetattr(pty.STDIN_FILENO) failed")
mode = None

new_stdin_winsz = None
if self.stdin_rows != None and self.stdin_cols != None:
try:
debug("Setting pty.STDIN_FILENO window size")
# Set number of columns and rows to be the
# floors of 1/5 of respective original values
target_stdin_winsz = struct.pack("HHHH", self.stdin_rows//5,
self.stdin_cols//5, 0, 0)
_set_term_winsz(pty.STDIN_FILENO, target_stdin_winsz)

# Were we able to set the window size
# of pty.STDIN_FILENO successfully?
new_stdin_winsz = _get_term_winsz(pty.STDIN_FILENO)
self.assertEqual(new_stdin_winsz, target_stdin_winsz,
"pty.STDIN_FILENO window size unchanged")
except OSError:
warnings.warn("Failed to set pty.STDIN_FILENO window size")
pass

try:
debug("Calling pty.openpty()")
try:
master_fd, slave_fd = pty.openpty(mode, new_stdin_winsz)
except TypeError:
master_fd, slave_fd = pty.openpty()
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")
except OSError:
# " An optional feature could not be imported " ... ?
raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")

self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
self.assertTrue(os.isatty(slave_fd), "slave_fd is not a tty")

if mode:
self.assertEqual(tty.tcgetattr(slave_fd), mode,
"openpty() failed to set slave termios")
if new_stdin_winsz:
self.assertEqual(_get_term_winsz(slave_fd), new_stdin_winsz,
"openpty() failed to set slave window size")

# Solaris requires reading the fd before anything is returned.
# My guess is that since we open and close the slave fd
# in master_open(), we need to read the EOF.
#
# NOTE: the above comment is from an older version of the test;
# master_open() is not being used anymore.

# Ensure the fd is non-blocking in case there's nothing to read.
blocking = os.get_blocking(master_fd)
Expand Down Expand Up @@ -222,8 +309,20 @@ def test_fork(self):

os.close(master_fd)

# pty.fork() passed.
@expectedFailureOnBSD
def test_master_read(self):
debug("Calling pty.openpty()")
master_fd, slave_fd = pty.openpty()
debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'")

debug("Closing slave_fd")
os.close(slave_fd)

debug("Reading from master_fd")
with self.assertRaises(OSError):
os.read(master_fd, 1)

os.close(master_fd)

class SmallPtyTests(unittest.TestCase):
"""These tests don't spawn children or hang."""
Expand Down Expand Up @@ -262,8 +361,9 @@ def _socketpair(self):
self.files.extend(socketpair)
return socketpair

def _mock_select(self, rfds, wfds, xfds):
def _mock_select(self, rfds, wfds, xfds, timeout=0):
Comment thread
ethanfurman marked this conversation as resolved.
Outdated
# This will raise IndexError when no more expected calls exist.
# This ignores the timeout
self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
return self.select_rfds_results.pop(0), [], []

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Updated tests for the pty library. test_basic() has been changed to test_openpty(); this additionally checks if slave termios and slave winsize are being set properly by pty.openpty(). In order to add support for FreeBSD, NetBSD, OpenBSD, and Darwin, this also adds test_master_read(), which demonstrates that pty.spawn() should not depend on an OSError to exit from its copy loop.
Morty Proxy This is a proxified and sanitized view of the page, visit original site.