Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 84ece0b

Browse filesBrowse files
authored
add execute_concurrent_async and expose execute_concurrent_* in Session (#1229)
1 parent ef176a5 commit 84ece0b
Copy full SHA for 84ece0b

File tree

3 files changed

+192
-66
lines changed
Filter options

3 files changed

+192
-66
lines changed

‎cassandra/cluster.py

Copy file name to clipboardExpand all lines: cassandra/cluster.py
+92Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2725,6 +2725,98 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
27252725
future.send_request()
27262726
return future
27272727

2728+
def execute_concurrent(self, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
2729+
"""
2730+
Executes a sequence of (statement, parameters) tuples concurrently. Each
2731+
``parameters`` item must be a sequence or :const:`None`.
2732+
2733+
The `concurrency` parameter controls how many statements will be executed
2734+
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
2735+
it is recommended that this be kept below 100 times the number of
2736+
core connections per host times the number of connected hosts (see
2737+
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
2738+
the event loop thread may attempt to block on new connection creation,
2739+
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
2740+
is 3 or higher, you can safely experiment with higher levels of concurrency.
2741+
2742+
If `raise_on_first_error` is left as :const:`True`, execution will stop
2743+
after the first failed statement and the corresponding exception will be
2744+
raised.
2745+
2746+
`results_generator` controls how the results are returned.
2747+
2748+
* If :const:`False`, the results are returned only after all requests have completed.
2749+
* If :const:`True`, a generator expression is returned. Using a generator results in a constrained
2750+
memory footprint when the results set will be large -- results are yielded
2751+
as they return instead of materializing the entire list at once. The trade for lower memory
2752+
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
2753+
on-the-fly).
2754+
2755+
`execution_profile` argument is the execution profile to use for this
2756+
request, it is passed directly to :meth:`Session.execute_async`.
2757+
2758+
A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
2759+
in the same order that the statements were passed in. If ``success`` is :const:`False`,
2760+
there was an error executing the statement, and ``result_or_exc``
2761+
will be an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
2762+
will be the query result.
2763+
2764+
Example usage::
2765+
2766+
select_statement = session.prepare("SELECT * FROM users WHERE id=?")
2767+
2768+
statements_and_params = []
2769+
for user_id in user_ids:
2770+
params = (user_id, )
2771+
statements_and_params.append((select_statement, params))
2772+
2773+
results = session.execute_concurrent(statements_and_params, raise_on_first_error=False)
2774+
2775+
for (success, result) in results:
2776+
if not success:
2777+
handle_error(result) # result will be an Exception
2778+
else:
2779+
process_user(result[0]) # result will be a list of rows
2780+
2781+
Note: in the case that `generators` are used, it is important to ensure the consumers do not
2782+
block or attempt further synchronous requests, because no further IO will be processed until
2783+
the consumer returns. This may also produce a deadlock in the IO event thread.
2784+
"""
2785+
from cassandra.concurrent import execute_concurrent
2786+
return execute_concurrent(self, statements_and_parameters, concurrency, raise_on_first_error, results_generator, execution_profile)
2787+
2788+
def execute_concurrent_with_args(self, statement, parameters, *args, **kwargs):
2789+
"""
2790+
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
2791+
statement and a sequence of parameters. Each item in ``parameters``
2792+
should be a sequence or :const:`None`.
2793+
2794+
Example usage::
2795+
2796+
statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
2797+
parameters = [(x,) for x in range(1000)]
2798+
session.execute_concurrent_with_args(statement, parameters, concurrency=50)
2799+
"""
2800+
from cassandra.concurrent import execute_concurrent_with_args
2801+
return execute_concurrent_with_args(self, statement, parameters, *args, **kwargs)
2802+
2803+
def execute_concurrent_async(self, statements_and_parameters, concurrency=100, raise_on_first_error=False, execution_profile=EXEC_PROFILE_DEFAULT):
2804+
"""
2805+
Asynchronously executes a sequence of (statement, parameters) tuples concurrently.
2806+
2807+
Args:
2808+
session: Cassandra session object.
2809+
statement_and_parameters: Iterable of (prepared CQL statement, bind parameters) tuples.
2810+
concurrency (int, optional): Number of concurrent operations. Default is 100.
2811+
raise_on_first_error (bool, optional): If True, execution stops on the first error. Default is True.
2812+
execution_profile (ExecutionProfile, optional): Execution profile to use. Default is EXEC_PROFILE_DEFAULT.
2813+
2814+
Returns:
2815+
A `Future` object that will be completed when all operations are done.
2816+
"""
2817+
from cassandra.concurrent import execute_concurrent_async
2818+
return execute_concurrent_async(self, statements_and_parameters, concurrency, raise_on_first_error, execution_profile)
2819+
27282820
def execute_graph(self, query, parameters=None, trace=False, execution_profile=EXEC_PROFILE_GRAPH_DEFAULT, execute_as=None):
27292821
"""
27302822
Executes a Gremlin query string or GraphStatement synchronously,

‎cassandra/concurrent.py

Copy file name to clipboardExpand all lines: cassandra/concurrent.py
+47-65Lines changed: 47 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -13,77 +13,23 @@
1313
# limitations under the License.
1414

1515

16+
import logging
1617
from collections import namedtuple
18+
from concurrent.futures import Future
1719
from heapq import heappush, heappop
1820
from itertools import cycle
1921
from threading import Condition
20-
import sys
2122

2223
from cassandra.cluster import ResultSet, EXEC_PROFILE_DEFAULT
2324

24-
import logging
2525
log = logging.getLogger(__name__)
2626

2727

2828
ExecutionResult = namedtuple('ExecutionResult', ['success', 'result_or_exc'])
2929

3030
def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT):
3131
"""
32-
Executes a sequence of (statement, parameters) tuples concurrently. Each
33-
``parameters`` item must be a sequence or :const:`None`.
34-
35-
The `concurrency` parameter controls how many statements will be executed
36-
concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
37-
it is recommended that this be kept below 100 times the number of
38-
core connections per host times the number of connected hosts (see
39-
:meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
40-
the event loop thread may attempt to block on new connection creation,
41-
substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
42-
is 3 or higher, you can safely experiment with higher levels of concurrency.
43-
44-
If `raise_on_first_error` is left as :const:`True`, execution will stop
45-
after the first failed statement and the corresponding exception will be
46-
raised.
47-
48-
`results_generator` controls how the results are returned.
49-
50-
* If :const:`False`, the results are returned only after all requests have completed.
51-
* If :const:`True`, a generator expression is returned. Using a generator results in a constrained
52-
memory footprint when the results set will be large -- results are yielded
53-
as they return instead of materializing the entire list at once. The trade for lower memory
54-
footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
55-
on-the-fly).
56-
57-
`execution_profile` argument is the execution profile to use for this
58-
request, it is passed directly to :meth:`Session.execute_async`.
59-
60-
A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
61-
in the same order that the statements were passed in. If ``success`` is :const:`False`,
62-
there was an error executing the statement, and ``result_or_exc`` will be
63-
an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
64-
will be the query result.
65-
66-
Example usage::
67-
68-
select_statement = session.prepare("SELECT * FROM users WHERE id=?")
69-
70-
statements_and_params = []
71-
for user_id in user_ids:
72-
params = (user_id, )
73-
statements_and_params.append((select_statement, params))
74-
75-
results = execute_concurrent(
76-
session, statements_and_params, raise_on_first_error=False)
77-
78-
for (success, result) in results:
79-
if not success:
80-
handle_error(result) # result will be an Exception
81-
else:
82-
process_user(result[0]) # result will be a list of rows
83-
84-
Note: in the case that `generators` are used, it is important to ensure the consumers do not
85-
block or attempt further synchronous requests, because no further IO will be processed until
86-
the consumer returns. This may also produce a deadlock in the IO event thread.
32+
See :meth:`.Session.execute_concurrent`.
8733
"""
8834
if concurrency <= 0:
8935
raise ValueError("concurrency must be greater than 0")
@@ -216,14 +162,50 @@ def _results(self):
216162

217163
def execute_concurrent_with_args(session, statement, parameters, *args, **kwargs):
218164
"""
219-
Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
220-
statement and a sequence of parameters. Each item in ``parameters``
221-
should be a sequence or :const:`None`.
165+
See :meth:`.Session.execute_concurrent_with_args`.
166+
"""
167+
return execute_concurrent(session, zip(cycle((statement,)), parameters), *args, **kwargs)
222168

223-
Example usage::
224169

225-
statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
226-
parameters = [(x,) for x in range(1000)]
227-
execute_concurrent_with_args(session, statement, parameters, concurrency=50)
170+
class ConcurrentExecutorFutureResults(ConcurrentExecutorListResults):
171+
def __init__(self, session, statements_and_params, execution_profile, future):
172+
super().__init__(session, statements_and_params, execution_profile)
173+
self.future = future
174+
175+
def _put_result(self, result, idx, success):
176+
super()._put_result(result, idx, success)
177+
with self._condition:
178+
if self._current == self._exec_count:
179+
if self._exception and self._fail_fast:
180+
self.future.set_exception(self._exception)
181+
else:
182+
sorted_results = [r[1] for r in sorted(self._results_queue)]
183+
self.future.set_result(sorted_results)
184+
185+
186+
def execute_concurrent_async(
187+
session,
188+
statements_and_parameters,
189+
concurrency=100,
190+
raise_on_first_error=False,
191+
execution_profile=EXEC_PROFILE_DEFAULT
192+
):
228193
"""
229-
return execute_concurrent(session, zip(cycle((statement,)), parameters), *args, **kwargs)
194+
See :meth:`.Session.execute_concurrent_async`.
195+
"""
196+
# Create a Future object and initialize the custom ConcurrentExecutor with the Future
197+
future = Future()
198+
executor = ConcurrentExecutorFutureResults(
199+
session=session,
200+
statements_and_params=statements_and_parameters,
201+
execution_profile=execution_profile,
202+
future=future
203+
)
204+
205+
# Execute concurrently
206+
try:
207+
executor.execute(concurrency=concurrency, fail_fast=raise_on_first_error)
208+
except Exception as e:
209+
future.set_exception(e)
210+
211+
return future

‎tests/unit/test_concurrent.py

Copy file name to clipboardExpand all lines: tests/unit/test_concurrent.py
+53-1Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import platform
2525

2626
from cassandra.cluster import Cluster, Session
27-
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args
27+
from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args, execute_concurrent_async
2828
from cassandra.pool import Host
2929
from cassandra.policies import SimpleConvictionPolicy
3030
from tests.unit.utils import mock_session_pools
@@ -239,6 +239,58 @@ def validate_result_ordering(self, results):
239239
self.assertLess(last_time_added, current_time_added)
240240
last_time_added = current_time_added
241241

242+
def insert_and_validate_list_async(self, reverse, slowdown):
243+
"""
244+
This utility method will execute submit various statements for execution using execute_concurrent_async,
245+
then invoke a separate thread to execute the callback associated with the futures registered
246+
for those statements. The parameters will toggle various timing, and ordering changes.
247+
Finally it will validate that the results were returned in the order they were submitted
248+
:param reverse: Execute the callbacks in the opposite order that they were submitted
249+
:param slowdown: Cause intermittent queries to perform slowly
250+
"""
251+
our_handler = MockResponseResponseFuture(reverse=reverse)
252+
mock_session = Mock()
253+
statements_and_params = zip(cycle(["INSERT INTO test3rf.test (k, v) VALUES (%s, 0)"]),
254+
[(i, ) for i in range(100)])
255+
mock_session.execute_async.return_value = our_handler
256+
257+
t = TimedCallableInvoker(our_handler, slowdown=slowdown)
258+
t.start()
259+
try:
260+
future = execute_concurrent_async(mock_session, statements_and_params)
261+
results = future.result()
262+
self.validate_result_ordering(results)
263+
finally:
264+
t.stop()
265+
266+
def test_results_ordering_async_forward(self):
267+
"""
268+
This tests the ordering of our execute_concurrent_async function
269+
when queries complete in the order they were executed.
270+
"""
271+
self.insert_and_validate_list_async(False, False)
272+
273+
def test_results_ordering_async_reverse(self):
274+
"""
275+
This tests the ordering of our execute_concurrent_async function
276+
when queries complete in the reverse order they were executed.
277+
"""
278+
self.insert_and_validate_list_async(True, False)
279+
280+
def test_results_ordering_async_forward_slowdown(self):
281+
"""
282+
This tests the ordering of our execute_concurrent_async function
283+
when queries complete in the order they were executed, with slow queries mixed in.
284+
"""
285+
self.insert_and_validate_list_async(False, True)
286+
287+
def test_results_ordering_async_reverse_slowdown(self):
288+
"""
289+
This tests the ordering of our execute_concurrent_async function
290+
when queries complete in the reverse order they were executed, with slow queries mixed in.
291+
"""
292+
self.insert_and_validate_list_async(True, True)
293+
242294
@mock_session_pools
243295
def test_recursion_limited(self):
244296
"""

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.