Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 01053da

Browse filesBrowse files
authored
Break consumer operations into request / response methods (#1845)
This breaks some of the consumer operations into request generation / response parsing methods. The public API does not change. However, this allows power users who are willing to deal with risk of private methods changing under their feet to decouple generating the message futures from processing their responses. In other words, you can use these to fire a bunch of request at once and delay processing the responses until all requests are fired.
1 parent 91f4642 commit 01053da
Copy full SHA for 01053da

File tree

1 file changed

+155
-94
lines changed
Filter options

1 file changed

+155
-94
lines changed

‎kafka/admin/client.py

Copy file name to clipboardExpand all lines: kafka/admin/client.py
+155-94Lines changed: 155 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def _send_request_to_controller(self, request):
349349
# one of these attributes and that they always unpack into
350350
# (topic, error_code) tuples.
351351
topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors')
352-
else response.topic_error_codes)
352+
else response.topic_error_codes)
353353
# Also small py2/py3 compatibility -- py3 can ignore extra values
354354
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
355355
# So for now we have to map across the list and explicitly drop any
@@ -501,8 +501,8 @@ def describe_configs(self, config_resources, include_synonyms=False):
501501
future = self._send_request_to_node(self._client.least_loaded_node(), request)
502502

503503
self._wait_for_futures([future])
504-
505-
return future.value
504+
response = future.value
505+
return response
506506

507507
@staticmethod
508508
def _convert_alter_config_resource_request(config_resource):
@@ -544,8 +544,8 @@ def alter_configs(self, config_resources):
544544
future = self._send_request_to_node(self._client.least_loaded_node(), request)
545545

546546
self._wait_for_futures([future])
547-
548-
return future.value
547+
response = future.value
548+
return response
549549

550550
# alter replica logs dir protocol not yet implemented
551551
# Note: have to lookup the broker with the replica assignment and send the request to that broker
@@ -602,6 +602,54 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
602602
# describe delegation_token protocol not yet implemented
603603
# Note: send the request to the least_loaded_node()
604604

605+
def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id):
606+
"""Send a DescribeGroupsRequest to the group's coordinator.
607+
608+
:param group_id: The group name as a string
609+
:param group_coordinator_id: The node_id of the groups' coordinator
610+
broker.
611+
:return: A message future.
612+
"""
613+
version = self._matching_api_version(DescribeGroupsRequest)
614+
if version <= 1:
615+
# Note: KAFKA-6788 A potential optimization is to group the
616+
# request per coordinator and send one request with a list of
617+
# all consumer groups. Java still hasn't implemented this
618+
# because the error checking is hard to get right when some
619+
# groups error and others don't.
620+
request = DescribeGroupsRequest[version](groups=(group_id,))
621+
else:
622+
raise NotImplementedError(
623+
"Support for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient."
624+
.format(version))
625+
return self._send_request_to_node(group_coordinator_id, request)
626+
627+
def _describe_consumer_groups_process_response(self, response):
628+
"""Process a DescribeGroupsResponse into a group description."""
629+
if response.API_VERSION <= 1:
630+
assert len(response.groups) == 1
631+
# TODO need to implement converting the response tuple into
632+
# a more accessible interface like a namedtuple and then stop
633+
# hardcoding tuple indices here. Several Java examples,
634+
# including KafkaAdminClient.java
635+
group_description = response.groups[0]
636+
error_code = group_description[0]
637+
error_type = Errors.for_code(error_code)
638+
# Java has the note: KAFKA-6789, we can retry based on the error code
639+
if error_type is not Errors.NoError:
640+
raise error_type(
641+
"DescribeGroupsResponse failed with response '{}'."
642+
.format(response))
643+
# TODO Java checks the group protocol type, and if consumer
644+
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
645+
# the members' partition assignments... that hasn't yet been
646+
# implemented here so just return the raw struct results
647+
else:
648+
raise NotImplementedError(
649+
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
650+
.format(response.API_VERSION))
651+
return group_description
652+
605653
def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
606654
"""Describe a set of consumer groups.
607655
@@ -622,51 +670,52 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
622670
"""
623671
group_descriptions = []
624672
futures = []
625-
version = self._matching_api_version(DescribeGroupsRequest)
626673
for group_id in group_ids:
627674
if group_coordinator_id is not None:
628675
this_groups_coordinator_id = group_coordinator_id
629676
else:
630677
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
631-
632-
if version <= 1:
633-
# Note: KAFKA-6788 A potential optimization is to group the
634-
# request per coordinator and send one request with a list of
635-
# all consumer groups. Java still hasn't implemented this
636-
# because the error checking is hard to get right when some
637-
# groups error and others don't.
638-
request = DescribeGroupsRequest[version](groups=(group_id,))
639-
futures.append(self._send_request_to_node(this_groups_coordinator_id, request))
640-
else:
641-
raise NotImplementedError(
642-
"Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
643-
.format(version))
678+
f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
679+
futures.append(f)
644680

645681
self._wait_for_futures(futures)
646682

647683
for future in futures:
648684
response = future.value
649-
assert len(response.groups) == 1
650-
# TODO need to implement converting the response tuple into
651-
# a more accessible interface like a namedtuple and then stop
652-
# hardcoding tuple indices here. Several Java examples,
653-
# including KafkaAdminClient.java
654-
group_description = response.groups[0]
655-
error_code = group_description[0]
656-
error_type = Errors.for_code(error_code)
657-
# Java has the note: KAFKA-6789, we can retry based on the error code
658-
if error_type is not Errors.NoError:
659-
raise error_type(
660-
"Request '{}' failed with response '{}'."
661-
.format(request, response))
662-
# TODO Java checks the group protocol type, and if consumer
663-
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
664-
# the members' partition assignments... that hasn't yet been
665-
# implemented here so just return the raw struct results
685+
group_description = self._describe_consumer_groups_process_response(response)
666686
group_descriptions.append(group_description)
667687

668688
return group_descriptions
669689

690+
def _list_consumer_groups_send_request(self, broker_id):
691+
"""Send a ListGroupsRequest to a broker.
692+
693+
:param broker_id: The broker's node_id.
694+
:return: A message future
695+
"""
696+
version = self._matching_api_version(ListGroupsRequest)
697+
if version <= 2:
698+
request = ListGroupsRequest[version]()
699+
else:
700+
raise NotImplementedError(
701+
"Support for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient."
702+
.format(version))
703+
return self._send_request_to_node(broker_id, request)
704+
705+
def _list_consumer_groups_process_response(self, response):
706+
"""Process a ListGroupsResponse into a list of groups."""
707+
if response.API_VERSION <= 2:
708+
error_type = Errors.for_code(response.error_code)
709+
if error_type is not Errors.NoError:
710+
raise error_type(
711+
"ListGroupsRequest failed with response '{}'."
712+
.format(response))
713+
else:
714+
raise NotImplementedError(
715+
"Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient."
716+
.format(response.API_VERSION))
717+
return response.groups
718+
670719
def list_consumer_groups(self, broker_ids=None):
671720
"""List all consumer groups known to the cluster.
672721
@@ -697,60 +746,24 @@ def list_consumer_groups(self, broker_ids=None):
697746
# consumer groups move to new brokers that haven't yet been queried,
698747
# then the same group could be returned by multiple brokers.
699748
consumer_groups = set()
700-
futures = []
701749
if broker_ids is None:
702750
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
703-
version = self._matching_api_version(ListGroupsRequest)
704-
if version <= 2:
705-
request = ListGroupsRequest[version]()
706-
for broker_id in broker_ids:
707-
futures.append(self._send_request_to_node(broker_id, request))
708-
709-
self._wait_for_futures(futures)
710-
711-
for future in futures:
712-
response = future.value
713-
error_type = Errors.for_code(response.error_code)
714-
if error_type is not Errors.NoError:
715-
raise error_type(
716-
"Request '{}' failed with response '{}'."
717-
.format(request, response))
718-
consumer_groups.update(response.groups)
719-
else:
720-
raise NotImplementedError(
721-
"Support for ListGroups v{} has not yet been added to KafkaAdminClient."
722-
.format(version))
751+
futures = [self._list_consumer_groups_send_request(b) for b in broker_ids]
752+
self._wait_for_futures(futures)
753+
for f in futures:
754+
response = f.value
755+
consumer_groups.update(self._list_consumer_groups_process_response(response))
723756
return list(consumer_groups)
724757

725-
def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
726-
partitions=None):
727-
"""Fetch Consumer Group Offsets.
728-
729-
Note:
730-
This does not verify that the group_id or partitions actually exist
731-
in the cluster.
732-
733-
As soon as any error is encountered, it is immediately raised.
758+
def _list_consumer_group_offsets_send_request(self, group_id,
759+
group_coordinator_id, partitions=None):
760+
"""Send an OffsetFetchRequest to a broker.
734761
735762
:param group_id: The consumer group id name for which to fetch offsets.
736763
:param group_coordinator_id: The node_id of the group's coordinator
737-
broker. If set to None, will query the cluster to find the group
738-
coordinator. Explicitly specifying this can be useful to prevent
739-
that extra network round trip if you already know the group
740-
coordinator. Default: None.
741-
:param partitions: A list of TopicPartitions for which to fetch
742-
offsets. On brokers >= 0.10.2, this can be set to None to fetch all
743-
known offsets for the consumer group. Default: None.
744-
:return dictionary: A dictionary with TopicPartition keys and
745-
OffsetAndMetada values. Partitions that are not specified and for
746-
which the group_id does not have a recorded offset are omitted. An
747-
offset value of `-1` indicates the group_id has no offset for that
748-
TopicPartition. A `-1` can only happen for partitions that are
749-
explicitly specified.
764+
broker.
765+
:return: A message future
750766
"""
751-
group_offsets_listing = {}
752-
if group_coordinator_id is None:
753-
group_coordinator_id = self._find_group_coordinator_id(group_id)
754767
version = self._matching_api_version(OffsetFetchRequest)
755768
if version <= 3:
756769
if partitions is None:
@@ -768,32 +781,80 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
768781
topics_partitions_dict[topic].add(partition)
769782
topics_partitions = list(six.iteritems(topics_partitions_dict))
770783
request = OffsetFetchRequest[version](group_id, topics_partitions)
771-
future = self._send_request_to_node(group_coordinator_id, request)
772-
self._wait_for_futures([future])
773-
response = future.value
784+
else:
785+
raise NotImplementedError(
786+
"Support for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient."
787+
.format(version))
788+
return self._send_request_to_node(group_coordinator_id, request)
789+
790+
def _list_consumer_group_offsets_process_response(self, response):
791+
"""Process an OffsetFetchResponse.
774792
775-
if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
793+
:param response: an OffsetFetchResponse.
794+
:return: A dictionary composed of TopicPartition keys and
795+
OffsetAndMetada values.
796+
"""
797+
if response.API_VERSION <= 3:
798+
799+
# OffsetFetchResponse_v1 lacks a top-level error_code
800+
if response.API_VERSION > 1:
776801
error_type = Errors.for_code(response.error_code)
777802
if error_type is not Errors.NoError:
778803
# optionally we could retry if error_type.retriable
779804
raise error_type(
780-
"Request '{}' failed with response '{}'."
781-
.format(request, response))
805+
"OffsetFetchResponse failed with response '{}'."
806+
.format(response))
807+
782808
# transform response into a dictionary with TopicPartition keys and
783809
# OffsetAndMetada values--this is what the Java AdminClient returns
810+
offsets = {}
784811
for topic, partitions in response.topics:
785812
for partition, offset, metadata, error_code in partitions:
786813
error_type = Errors.for_code(error_code)
787814
if error_type is not Errors.NoError:
788815
raise error_type(
789-
"Unable to fetch offsets for group_id {}, topic {}, partition {}"
790-
.format(group_id, topic, partition))
791-
group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
816+
"Unable to fetch consumer group offsets for topic {}, partition {}"
817+
.format(topic, partition))
818+
offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
792819
else:
793820
raise NotImplementedError(
794-
"Support for OffsetFetch v{} has not yet been added to KafkaAdminClient."
795-
.format(version))
796-
return group_offsets_listing
821+
"Support for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient."
822+
.format(response.API_VERSION))
823+
return offsets
824+
825+
def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
826+
partitions=None):
827+
"""Fetch Consumer Offsets for a single consumer group.
828+
829+
Note:
830+
This does not verify that the group_id or partitions actually exist
831+
in the cluster.
832+
833+
As soon as any error is encountered, it is immediately raised.
834+
835+
:param group_id: The consumer group id name for which to fetch offsets.
836+
:param group_coordinator_id: The node_id of the group's coordinator
837+
broker. If set to None, will query the cluster to find the group
838+
coordinator. Explicitly specifying this can be useful to prevent
839+
that extra network round trip if you already know the group
840+
coordinator. Default: None.
841+
:param partitions: A list of TopicPartitions for which to fetch
842+
offsets. On brokers >= 0.10.2, this can be set to None to fetch all
843+
known offsets for the consumer group. Default: None.
844+
:return dictionary: A dictionary with TopicPartition keys and
845+
OffsetAndMetada values. Partitions that are not specified and for
846+
which the group_id does not have a recorded offset are omitted. An
847+
offset value of `-1` indicates the group_id has no offset for that
848+
TopicPartition. A `-1` can only happen for partitions that are
849+
explicitly specified.
850+
"""
851+
if group_coordinator_id is None:
852+
group_coordinator_id = self._find_group_coordinator_id(group_id)
853+
future = self._list_consumer_group_offsets_send_request(
854+
group_id, group_coordinator_id, partitions)
855+
self._wait_for_futures([future])
856+
response = future.value
857+
return self._list_consumer_group_offsets_process_response(response)
797858

798859
# delete groups protocol not yet implemented
799860
# Note: send the request to the group's coordinator.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.