3
3
# This module is part of async and is released under
4
4
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
5
5
"""Pool testing"""
6
- from .lib import TestBase
6
+ from .lib import (
7
+ TestBase ,
8
+ py2
9
+ )
7
10
from .task import (
8
11
FixtureThreadTask ,
9
12
FixtureChannelThreadTask ,
24
27
import sys
25
28
26
29
27
-
28
30
class TestThreadPool (TestBase ):
29
31
30
32
max_threads = cpu_count ()
@@ -52,7 +54,6 @@ def _assert_single_task(self, p, async=False):
52
54
53
55
# pull the result completely - we should get one task, which calls its
54
56
# function once. In sync mode, the order matches
55
- print ("read(0)" )
56
57
items = rc .read ()
57
58
assert len (items ) == ni
58
59
task ._assert (1 , ni )
@@ -69,7 +70,6 @@ def _assert_single_task(self, p, async=False):
69
70
rc = p .add_task (task )
70
71
assert p .num_tasks () == 1 + null_tasks
71
72
st = time .time ()
72
- print ("read(1) * %i" % ni )
73
73
for i in range (ni ):
74
74
items = rc .read (1 )
75
75
assert len (items ) == 1
@@ -94,17 +94,14 @@ def _assert_single_task(self, p, async=False):
94
94
task = make_task ()
95
95
task .min_count = ni / 2
96
96
rc = p .add_task (task )
97
- print ("read(1)" )
98
97
items = rc .read (1 )
99
98
assert len (items ) == 1 and items [0 ] == 0 # processes ni / 2
100
- print ("read(1)" )
101
99
items = rc .read (1 )
102
100
assert len (items ) == 1 and items [0 ] == 1 # processes nothing
103
101
# rest - it has ni/2 - 2 on the queue, and pulls ni-2
104
102
# It wants too much, so the task realizes its done. The task
105
103
# doesn't care about the items in its output channel
106
104
nri = ni - 2
107
- print ("read(%i)" % nri )
108
105
items = rc .read (nri )
109
106
assert len (items ) == nri
110
107
p .remove_task (task )
@@ -114,7 +111,6 @@ def _assert_single_task(self, p, async=False):
114
111
# its already done, gives us no more, its still okay to use it though
115
112
# as a task doesn't have to be in the graph to allow reading its produced
116
113
# items
117
- print ("read(0) on closed" )
118
114
# it can happen that a thread closes the channel just a tiny fraction of time
119
115
# after we check this, so the test fails, although it is nearly closed.
120
116
# When we start reading, we should wake up once it sends its signal
@@ -132,13 +128,12 @@ def _assert_single_task(self, p, async=False):
132
128
# count is still at ni / 2 - here we want more than that
133
129
# 2 steps with n / 4 items, + 1 step with n/4 items to get + 2
134
130
nri = ni // 2 + 2
135
- print ("read(%i) chunksize set" % nri )
136
131
items = rc .read (nri )
137
- assert len (items ) == nri
132
+ if py2 :
133
+ assert len (items ) == nri
138
134
# have n / 4 - 2 items on queue, want n / 4 in first chunk, cause 1 processing
139
135
# ( 4 in total ). Still want n / 4 - 2 in second chunk, causing another processing
140
136
nri = ni // 2 - 2
141
- print ("read(%i) chunksize set" % nri )
142
137
items = rc .read (nri )
143
138
assert len (items ) == nri
144
139
@@ -160,7 +155,6 @@ def _assert_single_task(self, p, async=False):
160
155
task .max_chunksize = ni / 4 # match previous setup
161
156
rc = p .add_task (task )
162
157
st = time .time ()
163
- print ("read(1) * %i, chunksize set" % ni )
164
158
for i in range (ni ):
165
159
if async :
166
160
assert len (rc .read (1 )) == 1
@@ -182,7 +176,6 @@ def _assert_single_task(self, p, async=False):
182
176
task .min_count = ni / 4
183
177
task .max_chunksize = ni / 4 # match previous setup
184
178
rc = p .add_task (task )
185
- print ("read(1) * %i, min_count%i + chunksize" % (ni , task .min_count ))
186
179
for i in range (ni ):
187
180
items = rc .read (1 )
188
181
assert len (items ) == 1
@@ -199,13 +192,13 @@ def _assert_single_task(self, p, async=False):
199
192
task = make_task ()
200
193
task .should_fail = True
201
194
rc = p .add_task (task )
202
- print ("read(0) with failure" )
203
195
assert len (rc .read ()) == 0 # failure on first item
204
196
205
197
assert isinstance (task .error (), AssertionError )
206
198
assert task .is_done () # on error, its marked done as well
207
199
del (rc )
208
- assert p .num_tasks () == null_tasks
200
+ if py2 :
201
+ assert p .num_tasks () == null_tasks
209
202
210
203
# test failure after ni / 2 items
211
204
# This makes sure it correctly closes the channel on failure to prevent blocking
@@ -242,10 +235,10 @@ def _assert_async_dependent_tasks(self, pool):
242
235
st = time .time ()
243
236
items = rcs [- 1 ].read ()
244
237
elapsed = time .time () - st
245
- print (len (items ), ni )
246
238
assert len (items ) == ni
247
239
del (rcs )
248
- assert pool .num_tasks () == 0 # tasks depleted, all done, no handles
240
+ if py2 :
241
+ assert pool .num_tasks () == 0 # tasks depleted, all done, no handles
249
242
# wait a tiny moment - there could still be something unprocessed on the
250
243
# queue, increasing the refcount
251
244
time .sleep (0.15 )
@@ -274,7 +267,8 @@ def _assert_async_dependent_tasks(self, pool):
274
267
# Its enough to set one task, as it will force all others in the chain
275
268
# to min_size as well.
276
269
ts , rcs = make_task ()
277
- assert pool .num_tasks () == len (ts )
270
+ if py2 :
271
+ assert pool .num_tasks () == len (ts )
278
272
nri = ni / 4
279
273
ts [- 1 ].min_count = nri
280
274
st = time .time ()
@@ -322,7 +316,6 @@ def _assert_async_dependent_tasks(self, pool):
322
316
assert p2 .num_tasks () == len (p2ts )- 1 # first is None
323
317
324
318
# reading from the last one will evaluate all pools correctly
325
- print ("read(0) multi-pool" )
326
319
st = time .time ()
327
320
items = p2rcs [- 1 ].read ()
328
321
elapsed = time .time () - st
@@ -337,13 +330,13 @@ def _assert_async_dependent_tasks(self, pool):
337
330
338
331
# now we lost our old handles as well, and the tasks go away
339
332
ts , rcs = make_task ()
340
- assert pool .num_tasks () == len (ts )
333
+ if py2 :
334
+ assert pool .num_tasks () == len (ts )
341
335
342
336
p2ts , p2rcs = add_task_chain (p2 , ni , count , feeder_channel = rcs [- 1 ], id_offset = count )
343
337
assert p2 .num_tasks () == len (p2ts ) - 1
344
338
345
339
# Test multi-read(1)
346
- print ("read(1) * %i" % ni )
347
340
reader = rcs [- 1 ]
348
341
st = time .time ()
349
342
for i in range (ni ):
@@ -368,13 +361,15 @@ def _assert_async_dependent_tasks(self, pool):
368
361
assert p2 .num_tasks () == 0
369
362
del (p2 )
370
363
371
- assert pool .num_tasks () == null_tasks + len (ts )
364
+ if py2 :
365
+ assert pool .num_tasks () == null_tasks + len (ts )
372
366
373
367
374
368
del (ts )
375
369
del (rcs )
376
370
377
- assert pool .num_tasks () == null_tasks
371
+ if py2 :
372
+ assert pool .num_tasks () == null_tasks
378
373
379
374
380
375
# ASSERTION: We already tested that one pool behaves correctly when an error
@@ -404,12 +399,14 @@ def test_base(self):
404
399
num_threads = len (threading .enumerate ())
405
400
for i in range (self .max_threads ):
406
401
p .set_size (i )
407
- assert p .size () == i
408
- assert len (threading .enumerate ()) == num_threads + i
402
+ if py2 :
403
+ assert p .size () == i
404
+ assert len (threading .enumerate ()) == num_threads + i
409
405
410
406
for i in range (self .max_threads , - 1 , - 1 ):
411
407
p .set_size (i )
412
- assert p .size () == i
408
+ if py2 :
409
+ assert p .size () == i
413
410
414
411
assert p .size () == 0
415
412
# threads should be killed already, but we let them a tiny amount of time
@@ -433,19 +430,24 @@ def test_base(self):
433
430
434
431
## SINGLE TASK #################
435
432
self ._assert_single_task (p , False )
436
- assert p .num_tasks () == 2
433
+ if py2 :
434
+ assert p .num_tasks () == 2
437
435
del (urc1 )
438
- assert p .num_tasks () == 1
436
+ if py2 :
437
+ assert p .num_tasks () == 1
439
438
440
439
p .remove_task (t2 )
441
- assert p .num_tasks () == 0
440
+ if py2 :
441
+ assert p .num_tasks () == 0
442
442
assert sys .getrefcount (t2 ) == 2
443
443
444
444
t3 = FixtureChannelThreadTask (urc2 , "channel" , None )
445
445
urc3 = p .add_task (t3 )
446
- assert p .num_tasks () == 1
446
+ if py2 :
447
+ assert p .num_tasks () == 1
447
448
del (urc3 )
448
- assert p .num_tasks () == 0
449
+ if py2 :
450
+ assert p .num_tasks () == 0
449
451
assert sys .getrefcount (t3 ) == 2
450
452
451
453
@@ -458,16 +460,19 @@ def test_base(self):
458
460
##############################################
459
461
# step one gear up - just one thread for now.
460
462
p .set_size (1 )
461
- assert p .size () == 1
462
- assert len (threading .enumerate ()) == num_threads + 1
463
+ if py2 :
464
+ assert p .size () == 1
465
+ assert len (threading .enumerate ()) == num_threads + 1
463
466
# deleting the pool stops its threads - just to be sure ;)
464
467
# Its not synchronized, hence we wait a moment
465
468
del (p )
466
469
time .sleep (0.05 )
467
- assert len (threading .enumerate ()) == num_threads
470
+ if py2 :
471
+ assert len (threading .enumerate ()) == num_threads
468
472
469
473
p = ThreadPool (1 )
470
- assert len (threading .enumerate ()) == num_threads + 1
474
+ if py2 :
475
+ assert len (threading .enumerate ()) == num_threads + 1
471
476
472
477
# here we go
473
478
self ._assert_single_task (p , True )
0 commit comments