Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e562932

Browse filesBrowse files
authored
Merge pull request #4837 from Masorubka1/test_kqueue.py
Add test_kqueue.py from Cpython v3.11.2
2 parents 4cb755d + 47e621a commit e562932
Copy full SHA for e562932

File tree

1 file changed

+261
-0
lines changed
Filter options

1 file changed

+261
-0
lines changed

‎Lib/test/test_kqueue.py

Copy file name to clipboard
+261Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
"""
2+
Tests for kqueue wrapper.
3+
"""
4+
import errno
5+
import os
6+
import select
7+
import socket
8+
import time
9+
import unittest
10+
11+
if not hasattr(select, "kqueue"):
12+
raise unittest.SkipTest("test works only on BSD")
13+
14+
class TestKQueue(unittest.TestCase):
15+
def test_create_queue(self):
16+
kq = select.kqueue()
17+
self.assertTrue(kq.fileno() > 0, kq.fileno())
18+
self.assertTrue(not kq.closed)
19+
kq.close()
20+
self.assertTrue(kq.closed)
21+
self.assertRaises(ValueError, kq.fileno)
22+
23+
def test_create_event(self):
24+
from operator import lt, le, gt, ge
25+
26+
fd = os.open(os.devnull, os.O_WRONLY)
27+
self.addCleanup(os.close, fd)
28+
29+
ev = select.kevent(fd)
30+
other = select.kevent(1000)
31+
self.assertEqual(ev.ident, fd)
32+
self.assertEqual(ev.filter, select.KQ_FILTER_READ)
33+
self.assertEqual(ev.flags, select.KQ_EV_ADD)
34+
self.assertEqual(ev.fflags, 0)
35+
self.assertEqual(ev.data, 0)
36+
self.assertEqual(ev.udata, 0)
37+
self.assertEqual(ev, ev)
38+
self.assertNotEqual(ev, other)
39+
self.assertTrue(ev < other)
40+
self.assertTrue(other >= ev)
41+
for op in lt, le, gt, ge:
42+
self.assertRaises(TypeError, op, ev, None)
43+
self.assertRaises(TypeError, op, ev, 1)
44+
self.assertRaises(TypeError, op, ev, "ev")
45+
46+
ev = select.kevent(fd, select.KQ_FILTER_WRITE)
47+
self.assertEqual(ev.ident, fd)
48+
self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
49+
self.assertEqual(ev.flags, select.KQ_EV_ADD)
50+
self.assertEqual(ev.fflags, 0)
51+
self.assertEqual(ev.data, 0)
52+
self.assertEqual(ev.udata, 0)
53+
self.assertEqual(ev, ev)
54+
self.assertNotEqual(ev, other)
55+
56+
ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
57+
self.assertEqual(ev.ident, fd)
58+
self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
59+
self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
60+
self.assertEqual(ev.fflags, 0)
61+
self.assertEqual(ev.data, 0)
62+
self.assertEqual(ev.udata, 0)
63+
self.assertEqual(ev, ev)
64+
self.assertNotEqual(ev, other)
65+
66+
ev = select.kevent(1, 2, 3, 4, 5, 6)
67+
self.assertEqual(ev.ident, 1)
68+
self.assertEqual(ev.filter, 2)
69+
self.assertEqual(ev.flags, 3)
70+
self.assertEqual(ev.fflags, 4)
71+
self.assertEqual(ev.data, 5)
72+
self.assertEqual(ev.udata, 6)
73+
self.assertEqual(ev, ev)
74+
self.assertNotEqual(ev, other)
75+
76+
bignum = 0x7fff
77+
ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum)
78+
self.assertEqual(ev.ident, bignum)
79+
self.assertEqual(ev.filter, 1)
80+
self.assertEqual(ev.flags, 2)
81+
self.assertEqual(ev.fflags, 3)
82+
self.assertEqual(ev.data, bignum - 1)
83+
self.assertEqual(ev.udata, bignum)
84+
self.assertEqual(ev, ev)
85+
self.assertNotEqual(ev, other)
86+
87+
# Issue 11973
88+
bignum = 0xffff
89+
ev = select.kevent(0, 1, bignum)
90+
self.assertEqual(ev.ident, 0)
91+
self.assertEqual(ev.filter, 1)
92+
self.assertEqual(ev.flags, bignum)
93+
self.assertEqual(ev.fflags, 0)
94+
self.assertEqual(ev.data, 0)
95+
self.assertEqual(ev.udata, 0)
96+
self.assertEqual(ev, ev)
97+
self.assertNotEqual(ev, other)
98+
99+
# Issue 11973
100+
bignum = 0xffffffff
101+
ev = select.kevent(0, 1, 2, bignum)
102+
self.assertEqual(ev.ident, 0)
103+
self.assertEqual(ev.filter, 1)
104+
self.assertEqual(ev.flags, 2)
105+
self.assertEqual(ev.fflags, bignum)
106+
self.assertEqual(ev.data, 0)
107+
self.assertEqual(ev.udata, 0)
108+
self.assertEqual(ev, ev)
109+
self.assertNotEqual(ev, other)
110+
111+
112+
def test_queue_event(self):
113+
serverSocket = socket.create_server(('127.0.0.1', 0))
114+
client = socket.socket()
115+
client.setblocking(False)
116+
try:
117+
client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
118+
except OSError as e:
119+
self.assertEqual(e.args[0], errno.EINPROGRESS)
120+
else:
121+
#raise AssertionError("Connect should have raised EINPROGRESS")
122+
pass # FreeBSD doesn't raise an exception here
123+
server, addr = serverSocket.accept()
124+
125+
kq = select.kqueue()
126+
kq2 = select.kqueue.fromfd(kq.fileno())
127+
128+
ev = select.kevent(server.fileno(),
129+
select.KQ_FILTER_WRITE,
130+
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
131+
kq.control([ev], 0)
132+
ev = select.kevent(server.fileno(),
133+
select.KQ_FILTER_READ,
134+
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
135+
kq.control([ev], 0)
136+
ev = select.kevent(client.fileno(),
137+
select.KQ_FILTER_WRITE,
138+
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
139+
kq2.control([ev], 0)
140+
ev = select.kevent(client.fileno(),
141+
select.KQ_FILTER_READ,
142+
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
143+
kq2.control([ev], 0)
144+
145+
events = kq.control(None, 4, 1)
146+
events = set((e.ident, e.filter) for e in events)
147+
self.assertEqual(events, set([
148+
(client.fileno(), select.KQ_FILTER_WRITE),
149+
(server.fileno(), select.KQ_FILTER_WRITE)]))
150+
151+
client.send(b"Hello!")
152+
server.send(b"world!!!")
153+
154+
# We may need to call it several times
155+
for i in range(10):
156+
events = kq.control(None, 4, 1)
157+
if len(events) == 4:
158+
break
159+
time.sleep(1.0)
160+
else:
161+
self.fail('timeout waiting for event notifications')
162+
163+
events = set((e.ident, e.filter) for e in events)
164+
self.assertEqual(events, set([
165+
(client.fileno(), select.KQ_FILTER_WRITE),
166+
(client.fileno(), select.KQ_FILTER_READ),
167+
(server.fileno(), select.KQ_FILTER_WRITE),
168+
(server.fileno(), select.KQ_FILTER_READ)]))
169+
170+
# Remove completely client, and server read part
171+
ev = select.kevent(client.fileno(),
172+
select.KQ_FILTER_WRITE,
173+
select.KQ_EV_DELETE)
174+
kq.control([ev], 0)
175+
ev = select.kevent(client.fileno(),
176+
select.KQ_FILTER_READ,
177+
select.KQ_EV_DELETE)
178+
kq.control([ev], 0)
179+
ev = select.kevent(server.fileno(),
180+
select.KQ_FILTER_READ,
181+
select.KQ_EV_DELETE)
182+
kq.control([ev], 0, 0)
183+
184+
events = kq.control([], 4, 0.99)
185+
events = set((e.ident, e.filter) for e in events)
186+
self.assertEqual(events, set([
187+
(server.fileno(), select.KQ_FILTER_WRITE)]))
188+
189+
client.close()
190+
server.close()
191+
serverSocket.close()
192+
193+
def testPair(self):
194+
kq = select.kqueue()
195+
a, b = socket.socketpair()
196+
197+
a.send(b'foo')
198+
event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
199+
event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
200+
r = kq.control([event1, event2], 1, 1)
201+
self.assertTrue(r)
202+
self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
203+
self.assertEqual(b.recv(r[0].data), b'foo')
204+
205+
a.close()
206+
b.close()
207+
kq.close()
208+
209+
def test_issue30058(self):
210+
# changelist must be an iterable
211+
kq = select.kqueue()
212+
a, b = socket.socketpair()
213+
ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
214+
215+
kq.control([ev], 0)
216+
# not a list
217+
kq.control((ev,), 0)
218+
# __len__ is not consistent with __iter__
219+
class BadList:
220+
def __len__(self):
221+
return 0
222+
def __iter__(self):
223+
for i in range(100):
224+
yield ev
225+
kq.control(BadList(), 0)
226+
# doesn't have __len__
227+
kq.control(iter([ev]), 0)
228+
229+
a.close()
230+
b.close()
231+
kq.close()
232+
233+
def test_close(self):
234+
open_file = open(__file__, "rb")
235+
self.addCleanup(open_file.close)
236+
fd = open_file.fileno()
237+
kqueue = select.kqueue()
238+
239+
# test fileno() method and closed attribute
240+
self.assertIsInstance(kqueue.fileno(), int)
241+
self.assertFalse(kqueue.closed)
242+
243+
# test close()
244+
kqueue.close()
245+
self.assertTrue(kqueue.closed)
246+
self.assertRaises(ValueError, kqueue.fileno)
247+
248+
# close() can be called more than once
249+
kqueue.close()
250+
251+
# operations must fail with ValueError("I/O operation on closed ...")
252+
self.assertRaises(ValueError, kqueue.control, None, 4)
253+
254+
def test_fd_non_inheritable(self):
255+
kqueue = select.kqueue()
256+
self.addCleanup(kqueue.close)
257+
self.assertEqual(os.get_inheritable(kqueue.fileno()), False)
258+
259+
260+
if __name__ == "__main__":
261+
unittest.main()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.