@@ -271,7 +271,49 @@ def _refresh_controller_id(self):
271
271
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
272
272
.format (version ))
273
273
274
- def _find_group_coordinator_id (self , group_id ):
274
+ def _find_coordinator_id_send_request (self , group_id ):
275
+ """Send a FindCoordinatorRequest to a broker.
276
+
277
+ :param group_id: The consumer group ID. This is typically the group
278
+ name as a string.
279
+ :return: A message future
280
+ """
281
+ # TODO add support for dynamically picking version of
282
+ # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
283
+ # When I experimented with this, the coordinator value returned in
284
+ # GroupCoordinatorResponse_v1 didn't match the value returned by
285
+ # GroupCoordinatorResponse_v0 and I couldn't figure out why.
286
+ version = 0
287
+ # version = self._matching_api_version(GroupCoordinatorRequest)
288
+ if version <= 0 :
289
+ request = GroupCoordinatorRequest [version ](group_id )
290
+ else :
291
+ raise NotImplementedError (
292
+ "Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
293
+ .format (version ))
294
+ return self ._send_request_to_node (self ._client .least_loaded_node (), request )
295
+
296
+ def _find_coordinator_id_process_response (self , response ):
297
+ """Process a FindCoordinatorResponse.
298
+
299
+ :param response: a FindCoordinatorResponse.
300
+ :return: The node_id of the broker that is the coordinator.
301
+ """
302
+ if response .API_VERSION <= 0 :
303
+ error_type = Errors .for_code (response .error_code )
304
+ if error_type is not Errors .NoError :
305
+ # Note: When error_type.retriable, Java will retry... see
306
+ # KafkaAdminClient's handleFindCoordinatorError method
307
+ raise error_type (
308
+ "FindCoordinatorRequest failed with response '{}'."
309
+ .format (response ))
310
+ else :
311
+ raise NotImplementedError (
312
+ "Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
313
+ .format (response .API_VERSION ))
314
+ return response .coordinator_id
315
+
316
+ def _find_coordinator_id (self , group_id ):
275
317
"""Find the broker node_id of the coordinator of the given group.
276
318
277
319
Sends a FindCoordinatorRequest message to the cluster. Will block until
@@ -283,35 +325,10 @@ def _find_group_coordinator_id(self, group_id):
283
325
:return: The node_id of the broker that is the coordinator.
284
326
"""
285
327
# Note: Java may change how this is implemented in KAFKA-6791.
286
- #
287
- # TODO add support for dynamically picking version of
288
- # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
289
- # When I experimented with this, GroupCoordinatorResponse_v1 didn't
290
- # match GroupCoordinatorResponse_v0 and I couldn't figure out why.
291
- gc_request = GroupCoordinatorRequest [0 ](group_id )
292
- future = self ._send_request_to_node (self ._client .least_loaded_node (), gc_request )
293
-
328
+ future = self ._find_coordinator_id_send_request (group_id )
294
329
self ._wait_for_futures ([future ])
295
-
296
- gc_response = future .value
297
- # use the extra error checking in add_group_coordinator() rather than
298
- # immediately returning the group coordinator.
299
- success = self ._client .cluster .add_group_coordinator (group_id , gc_response )
300
- if not success :
301
- error_type = Errors .for_code (gc_response .error_code )
302
- assert error_type is not Errors .NoError
303
- # Note: When error_type.retriable, Java will retry... see
304
- # KafkaAdminClient's handleFindCoordinatorError method
305
- raise error_type (
306
- "Could not identify group coordinator for group_id '{}' from response '{}'."
307
- .format (group_id , gc_response ))
308
- group_coordinator = self ._client .cluster .coordinator_for_group (group_id )
309
- # will be None if the coordinator was never populated, which should never happen here
310
- assert group_coordinator is not None
311
- # will be -1 if add_group_coordinator() failed... but by this point the
312
- # error should have been raised.
313
- assert group_coordinator != - 1
314
- return group_coordinator
330
+ response = future .value
331
+ return self ._find_coordinator_id_process_response (response )
315
332
316
333
def _send_request_to_node (self , node_id , request ):
317
334
"""Send a Kafka protocol message to a specific broker.
@@ -329,7 +346,6 @@ def _send_request_to_node(self, node_id, request):
329
346
self ._client .poll ()
330
347
return self ._client .send (node_id , request )
331
348
332
-
333
349
def _send_request_to_controller (self , request ):
334
350
"""Send a Kafka protocol message to the cluster controller.
335
351
@@ -678,7 +694,7 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
678
694
if group_coordinator_id is not None :
679
695
this_groups_coordinator_id = group_coordinator_id
680
696
else :
681
- this_groups_coordinator_id = self ._find_group_coordinator_id (group_id )
697
+ this_groups_coordinator_id = self ._find_coordinator_id (group_id )
682
698
f = self ._describe_consumer_groups_send_request (group_id , this_groups_coordinator_id )
683
699
futures .append (f )
684
700
@@ -853,7 +869,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
853
869
explicitly specified.
854
870
"""
855
871
if group_coordinator_id is None :
856
- group_coordinator_id = self ._find_group_coordinator_id (group_id )
872
+ group_coordinator_id = self ._find_coordinator_id (group_id )
857
873
future = self ._list_consumer_group_offsets_send_request (
858
874
group_id , group_coordinator_id , partitions )
859
875
self ._wait_for_futures ([future ])
0 commit comments