Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 405a2d7

Browse filesBrowse files
authored
gh-123471: make itertools.batched thread-safe (#129416)
1 parent 155c44b commit 405a2d7
Copy full SHA for 405a2d7

File tree

3 files changed

+51
-2
lines changed
Filter options

3 files changed

+51
-2
lines changed
+39Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import unittest
2+
import sys
3+
from threading import Thread, Barrier
4+
from itertools import batched
5+
from test.support import threading_helper
6+
7+
8+
threading_helper.requires_working_threading(module=True)
9+
10+
class EnumerateThreading(unittest.TestCase):
11+
12+
@threading_helper.reap_threads
13+
def test_threading(self):
14+
number_of_threads = 10
15+
number_of_iterations = 20
16+
barrier = Barrier(number_of_threads)
17+
def work(it):
18+
barrier.wait()
19+
while True:
20+
try:
21+
_ = next(it)
22+
except StopIteration:
23+
break
24+
25+
data = tuple(range(1000))
26+
for it in range(number_of_iterations):
27+
batch_iterator = batched(data, 2)
28+
worker_threads = []
29+
for ii in range(number_of_threads):
30+
worker_threads.append(
31+
Thread(target=work, args=[batch_iterator]))
32+
33+
with threading_helper.start_threads(worker_threads):
34+
pass
35+
36+
barrier.reset()
37+
38+
if __name__ == "__main__":
39+
unittest.main()
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Make concurrent iterations over :class:`itertools.batched` safe under free-threading.

‎Modules/itertoolsmodule.c

Copy file name to clipboardExpand all lines: Modules/itertoolsmodule.c
+11-2Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,12 +191,12 @@ batched_next(PyObject *op)
191191
{
192192
batchedobject *bo = batchedobject_CAST(op);
193193
Py_ssize_t i;
194-
Py_ssize_t n = bo->batch_size;
194+
Py_ssize_t n = FT_ATOMIC_LOAD_SSIZE_RELAXED(bo->batch_size);
195195
PyObject *it = bo->it;
196196
PyObject *item;
197197
PyObject *result;
198198

199-
if (it == NULL) {
199+
if (n < 0) {
200200
return NULL;
201201
}
202202
result = PyTuple_New(n);
@@ -218,19 +218,28 @@ batched_next(PyObject *op)
218218
if (PyErr_Occurred()) {
219219
if (!PyErr_ExceptionMatches(PyExc_StopIteration)) {
220220
/* Input raised an exception other than StopIteration */
221+
FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1);
222+
#ifndef Py_GIL_DISABLED
221223
Py_CLEAR(bo->it);
224+
#endif
222225
Py_DECREF(result);
223226
return NULL;
224227
}
225228
PyErr_Clear();
226229
}
227230
if (i == 0) {
231+
FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1);
232+
#ifndef Py_GIL_DISABLED
228233
Py_CLEAR(bo->it);
234+
#endif
229235
Py_DECREF(result);
230236
return NULL;
231237
}
232238
if (bo->strict) {
239+
FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1);
240+
#ifndef Py_GIL_DISABLED
233241
Py_CLEAR(bo->it);
242+
#endif
234243
Py_DECREF(result);
235244
PyErr_SetString(PyExc_ValueError, "batched(): incomplete batch");
236245
return NULL;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.