Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Latest commit

 

History

History
History
118 lines (99 loc) · 3.58 KB

File metadata and controls

118 lines (99 loc) · 3.58 KB
Copy raw file
Download raw file
Open symbols panel
Edit and raw actions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import fcntl
import os
import pty
import struct
import sys
import termios
import textwrap
import unittest
try:
from unittest import skip
except ImportError:
def skip(f):
return lambda self: None
try:
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.protocol import ProcessProtocol
from twisted.trial.unittest import TestCase as TrialTestCase
except ImportError:
reactor = None
TEST_CONFIG = os.path.join(os.path.dirname(__file__), "test.config")
def set_win_size(fd, rows, columns):
s = struct.pack('HHHH', rows, columns, 0, 0)
fcntl.ioctl(fd, termios.TIOCSWINSZ, s)
class CrashersTest(object):
backend = "cli"
def run_bpython(self, input):
"""
Run bpython (with `backend` as backend) in a subprocess and
enter the given input. Uses a test config that disables the
paste detection.
Retuns bpython's output.
"""
result = Deferred()
class Protocol(ProcessProtocol):
STATES = (SEND_INPUT, COLLECT) = xrange(2)
def __init__(self):
self.data = ""
self.delayed_call = None
self.states = iter(self.STATES)
self.state = self.states.next()
def outReceived(self, data):
self.data += data
if self.delayed_call is not None:
self.delayed_call.cancel()
self.delayed_call = reactor.callLater(0.5, self.next)
def next(self):
self.delayed_call = None
if self.state == self.SEND_INPUT:
index = self.data.find(">>> ")
if index >= 0:
self.data = self.data[index + 4:]
self.transport.write(input)
self.state = self.states.next()
else:
self.transport.closeStdin()
if self.transport.pid is not None:
self.delayed_call = None
self.transport.signalProcess("TERM")
def processExited(self, reason):
if self.delayed_call is not None:
self.delayed_call.cancel()
result.callback(self.data)
(master, slave) = pty.openpty()
set_win_size(slave, 25, 80)
reactor.spawnProcess(Protocol(), sys.executable,
(sys.executable, "-m", "bpython." + self.backend,
"--config", TEST_CONFIG),
env=dict(TERM="vt100", LANG=os.environ.get("LANG", "")),
usePTY=(master, slave, os.ttyname(slave)))
return result
def test_issue108(self):
input = textwrap.dedent(
"""\
def spam():
u"y\\xe4y"
\b
spam(""")
deferred = self.run_bpython(input)
return deferred.addCallback(self.check_no_traceback)
def test_issue133(self):
input = textwrap.dedent(
"""\
def spam(a, (b, c)):
pass
\b
spam(1""")
return self.run_bpython(input).addCallback(self.check_no_traceback)
def check_no_traceback(self, data):
tb = data[data.find("Traceback"):]
self.assertTrue("Traceback" not in data, tb)
if reactor is not None:
class CursesCrashersTest(TrialTestCase, CrashersTest):
backend = "cli"
@skip("take 6 seconds, and Simon says we can skip them")
class UrwidCrashersTest(TrialTestCase, CrashersTest):
backend = "urwid"
if __name__ == "__main__":
unittest.main()
Morty Proxy This is a proxified and sanitized view of the page, visit original site.