Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 941e908

Browse filesBrowse files
committed
Break FindCoordinator into request/response methods
This splits the `_find_coordinator_id()` method (which is blocking) into request generation / response parsing methods. The public API does not change. However, this allows power users who are willing to deal with risk of private methods changing under their feet to decouple generating the message futures from processing their responses. In other words, you can use these to fire a bunch of requests at once and delay processing the responses until all requests are fired. This is modeled on the work done in #1845. Additionally, I removed the code that tried to leverage the error checking from `cluster.add_group_coordinator()`. That code had changed in #1822, removing most of the error checking... so it no longer adds any value, but instead merely increases complexity and coupling.
1 parent eed25fc commit 941e908
Copy full SHA for 941e908

File tree

Expand file treeCollapse file tree

1 file changed

+48
-32
lines changed
Filter options
Expand file treeCollapse file tree

1 file changed

+48
-32
lines changed

‎kafka/admin/client.py

Copy file name to clipboardExpand all lines: kafka/admin/client.py
+48-32Lines changed: 48 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,49 @@ def _refresh_controller_id(self):
271271
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
272272
.format(version))
273273

274-
def _find_group_coordinator_id(self, group_id):
274+
def _find_coordinator_id_send_request(self, group_id):
275+
"""Send a FindCoordinatorRequest to a broker.
276+
277+
:param group_id: The consumer group ID. This is typically the group
278+
name as a string.
279+
:return: A message future
280+
"""
281+
# TODO add support for dynamically picking version of
282+
# GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
283+
# When I experimented with this, the coordinator value returned in
284+
# GroupCoordinatorResponse_v1 didn't match the value returned by
285+
# GroupCoordinatorResponse_v0 and I couldn't figure out why.
286+
version = 0
287+
# version = self._matching_api_version(GroupCoordinatorRequest)
288+
if version <= 0:
289+
request = GroupCoordinatorRequest[version](group_id)
290+
else:
291+
raise NotImplementedError(
292+
"Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
293+
.format(version))
294+
return self._send_request_to_node(self._client.least_loaded_node(), request)
295+
296+
def _find_coordinator_id_process_response(self, response):
297+
"""Process a FindCoordinatorResponse.
298+
299+
:param response: a FindCoordinatorResponse.
300+
:return: The node_id of the broker that is the coordinator.
301+
"""
302+
if response.API_VERSION <= 0:
303+
error_type = Errors.for_code(response.error_code)
304+
if error_type is not Errors.NoError:
305+
# Note: When error_type.retriable, Java will retry... see
306+
# KafkaAdminClient's handleFindCoordinatorError method
307+
raise error_type(
308+
"FindCoordinatorRequest failed with response '{}'."
309+
.format(response))
310+
else:
311+
raise NotImplementedError(
312+
"Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
313+
.format(response.API_VERSION))
314+
return response.coordinator_id
315+
316+
def _find_coordinator_id(self, group_id):
275317
"""Find the broker node_id of the coordinator of the given group.
276318
277319
Sends a FindCoordinatorRequest message to the cluster. Will block until
@@ -283,35 +325,10 @@ def _find_group_coordinator_id(self, group_id):
283325
:return: The node_id of the broker that is the coordinator.
284326
"""
285327
# Note: Java may change how this is implemented in KAFKA-6791.
286-
#
287-
# TODO add support for dynamically picking version of
288-
# GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
289-
# When I experimented with this, GroupCoordinatorResponse_v1 didn't
290-
# match GroupCoordinatorResponse_v0 and I couldn't figure out why.
291-
gc_request = GroupCoordinatorRequest[0](group_id)
292-
future = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
293-
328+
future = self._find_coordinator_id_send_request(group_id)
294329
self._wait_for_futures([future])
295-
296-
gc_response = future.value
297-
# use the extra error checking in add_group_coordinator() rather than
298-
# immediately returning the group coordinator.
299-
success = self._client.cluster.add_group_coordinator(group_id, gc_response)
300-
if not success:
301-
error_type = Errors.for_code(gc_response.error_code)
302-
assert error_type is not Errors.NoError
303-
# Note: When error_type.retriable, Java will retry... see
304-
# KafkaAdminClient's handleFindCoordinatorError method
305-
raise error_type(
306-
"Could not identify group coordinator for group_id '{}' from response '{}'."
307-
.format(group_id, gc_response))
308-
group_coordinator = self._client.cluster.coordinator_for_group(group_id)
309-
# will be None if the coordinator was never populated, which should never happen here
310-
assert group_coordinator is not None
311-
# will be -1 if add_group_coordinator() failed... but by this point the
312-
# error should have been raised.
313-
assert group_coordinator != -1
314-
return group_coordinator
330+
response = future.value
331+
return self._find_coordinator_id_process_response(response)
315332

316333
def _send_request_to_node(self, node_id, request):
317334
"""Send a Kafka protocol message to a specific broker.
@@ -329,7 +346,6 @@ def _send_request_to_node(self, node_id, request):
329346
self._client.poll()
330347
return self._client.send(node_id, request)
331348

332-
333349
def _send_request_to_controller(self, request):
334350
"""Send a Kafka protocol message to the cluster controller.
335351
@@ -678,7 +694,7 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
678694
if group_coordinator_id is not None:
679695
this_groups_coordinator_id = group_coordinator_id
680696
else:
681-
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
697+
this_groups_coordinator_id = self._find_coordinator_id(group_id)
682698
f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
683699
futures.append(f)
684700

@@ -853,7 +869,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
853869
explicitly specified.
854870
"""
855871
if group_coordinator_id is None:
856-
group_coordinator_id = self._find_group_coordinator_id(group_id)
872+
group_coordinator_id = self._find_coordinator_id(group_id)
857873
future = self._list_consumer_group_offsets_send_request(
858874
group_id, group_coordinator_id, partitions)
859875
self._wait_for_futures([future])

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.