13
13
# limitations under the License.
14
14
15
15
16
+ import logging
16
17
from collections import namedtuple
18
+ from concurrent .futures import Future
17
19
from heapq import heappush , heappop
18
20
from itertools import cycle
19
21
from threading import Condition
20
- import sys
21
22
22
23
from cassandra .cluster import ResultSet , EXEC_PROFILE_DEFAULT
23
24
24
- import logging
25
25
log = logging .getLogger (__name__ )
26
26
27
27
28
28
ExecutionResult = namedtuple ('ExecutionResult' , ['success' , 'result_or_exc' ])
29
29
30
30
def execute_concurrent (session , statements_and_parameters , concurrency = 100 , raise_on_first_error = True , results_generator = False , execution_profile = EXEC_PROFILE_DEFAULT ):
31
31
"""
32
- Executes a sequence of (statement, parameters) tuples concurrently. Each
33
- ``parameters`` item must be a sequence or :const:`None`.
34
-
35
- The `concurrency` parameter controls how many statements will be executed
36
- concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2,
37
- it is recommended that this be kept below 100 times the number of
38
- core connections per host times the number of connected hosts (see
39
- :meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded,
40
- the event loop thread may attempt to block on new connection creation,
41
- substantially impacting throughput. If :attr:`~.Cluster.protocol_version`
42
- is 3 or higher, you can safely experiment with higher levels of concurrency.
43
-
44
- If `raise_on_first_error` is left as :const:`True`, execution will stop
45
- after the first failed statement and the corresponding exception will be
46
- raised.
47
-
48
- `results_generator` controls how the results are returned.
49
-
50
- * If :const:`False`, the results are returned only after all requests have completed.
51
- * If :const:`True`, a generator expression is returned. Using a generator results in a constrained
52
- memory footprint when the results set will be large -- results are yielded
53
- as they return instead of materializing the entire list at once. The trade for lower memory
54
- footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results
55
- on-the-fly).
56
-
57
- `execution_profile` argument is the execution profile to use for this
58
- request, it is passed directly to :meth:`Session.execute_async`.
59
-
60
- A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned
61
- in the same order that the statements were passed in. If ``success`` is :const:`False`,
62
- there was an error executing the statement, and ``result_or_exc`` will be
63
- an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc``
64
- will be the query result.
65
-
66
- Example usage::
67
-
68
- select_statement = session.prepare("SELECT * FROM users WHERE id=?")
69
-
70
- statements_and_params = []
71
- for user_id in user_ids:
72
- params = (user_id, )
73
- statements_and_params.append((select_statement, params))
74
-
75
- results = execute_concurrent(
76
- session, statements_and_params, raise_on_first_error=False)
77
-
78
- for (success, result) in results:
79
- if not success:
80
- handle_error(result) # result will be an Exception
81
- else:
82
- process_user(result[0]) # result will be a list of rows
83
-
84
- Note: in the case that `generators` are used, it is important to ensure the consumers do not
85
- block or attempt further synchronous requests, because no further IO will be processed until
86
- the consumer returns. This may also produce a deadlock in the IO event thread.
32
+ See :meth:`.Session.execute_concurrent`.
87
33
"""
88
34
if concurrency <= 0 :
89
35
raise ValueError ("concurrency must be greater than 0" )
@@ -216,14 +162,50 @@ def _results(self):
216
162
217
163
def execute_concurrent_with_args (session , statement , parameters , * args , ** kwargs ):
218
164
"""
219
- Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
220
- statement and a sequence of parameters. Each item in ``parameters``
221
- should be a sequence or :const:`None`.
165
+ See :meth:`.Session.execute_concurrent_with_args`.
166
+ """
167
+ return execute_concurrent ( session , zip ( cycle (( statement ,)), parameters ), * args , ** kwargs )
222
168
223
- Example usage::
224
169
225
- statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
226
- parameters = [(x,) for x in range(1000)]
227
- execute_concurrent_with_args(session, statement, parameters, concurrency=50)
170
+ class ConcurrentExecutorFutureResults (ConcurrentExecutorListResults ):
171
+ def __init__ (self , session , statements_and_params , execution_profile , future ):
172
+ super ().__init__ (session , statements_and_params , execution_profile )
173
+ self .future = future
174
+
175
+ def _put_result (self , result , idx , success ):
176
+ super ()._put_result (result , idx , success )
177
+ with self ._condition :
178
+ if self ._current == self ._exec_count :
179
+ if self ._exception and self ._fail_fast :
180
+ self .future .set_exception (self ._exception )
181
+ else :
182
+ sorted_results = [r [1 ] for r in sorted (self ._results_queue )]
183
+ self .future .set_result (sorted_results )
184
+
185
+
186
+ def execute_concurrent_async (
187
+ session ,
188
+ statements_and_parameters ,
189
+ concurrency = 100 ,
190
+ raise_on_first_error = False ,
191
+ execution_profile = EXEC_PROFILE_DEFAULT
192
+ ):
228
193
"""
229
- return execute_concurrent (session , zip (cycle ((statement ,)), parameters ), * args , ** kwargs )
194
+ See :meth:`.Session.execute_concurrent_async`.
195
+ """
196
+ # Create a Future object and initialize the custom ConcurrentExecutor with the Future
197
+ future = Future ()
198
+ executor = ConcurrentExecutorFutureResults (
199
+ session = session ,
200
+ statements_and_params = statements_and_parameters ,
201
+ execution_profile = execution_profile ,
202
+ future = future
203
+ )
204
+
205
+ # Execute concurrently
206
+ try :
207
+ executor .execute (concurrency = concurrency , fail_fast = raise_on_first_error )
208
+ except Exception as e :
209
+ future .set_exception (e )
210
+
211
+ return future
0 commit comments