@@ -623,48 +623,28 @@ def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messa
623
623
624
624
625
625
@pytest .mark .skipif (env_kafka_version () < (0 , 10 , 1 ), reason = "Requires KAFKA_VERSION >= 0.10.1" )
626
- def test_kafka_consumer_max_bytes_simple (kafka_consumer_factory , topic , send_messages ):
627
- send_messages (range (100 , 200 ), partition = 0 )
628
- send_messages (range (200 , 300 ), partition = 1 )
629
-
630
- # Start a consumer
631
- consumer = kafka_consumer_factory (
632
- auto_offset_reset = 'earliest' , fetch_max_bytes = 300 )
633
- seen_partitions = set ()
634
- for i in range (90 ):
635
- poll_res = consumer .poll (timeout_ms = 100 )
636
- for partition , msgs in poll_res .items ():
637
- for msg in msgs :
638
- seen_partitions .add (partition )
626
+ def test_kafka_consumer_max_bytes_one_msg (kafka_consumer_factory , topic , send_messages ):
627
+ """Check that messages larger than fetch_max_bytes are still received.
639
628
640
- # Check that we fetched at least 1 message from both partitions
641
- assert seen_partitions == {TopicPartition (topic , 0 ), TopicPartition (topic , 1 )}
642
-
643
-
644
- @pytest .mark .skipif (env_kafka_version () < (0 , 10 , 1 ), reason = "Requires KAFKA_VERSION >= 0.10.1" )
645
- def test_kafka_consumer_max_bytes_one_msg (kafka_consumer_factory , send_messages ):
646
- # We send to only 1 partition so we don't have parallel requests to 2
647
- # nodes for data.
648
- send_messages (range (100 , 200 ))
629
+ We are checking for both partition starvation and messages simply not being
630
+ received. The broker should reply with them, just making sure the consumer
631
+ isn't doing anything unexpected client-side that blocks them.
632
+ """
633
+ send_messages (range (0 , 100 ), partition = 0 )
634
+ send_messages (range (100 , 200 ), partition = 1 )
649
635
650
636
# Start a consumer. FetchResponse_v3 should always include at least 1
651
637
# full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time
652
638
# But 0.11.0.0 returns 1 MessageSet at a time when the messages are
653
639
# stored in the new v2 format by the broker.
654
- #
655
- # DP Note: This is a strange test. The consumer shouldn't care
656
- # how many messages are included in a FetchResponse, as long as it is
657
- # non-zero. I would not mind if we deleted this test. It caused
658
- # a minor headache when testing 0.11.0.0.
659
- group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string (5 )
660
- consumer = kafka_consumer_factory (
661
- group_id = group ,
662
- auto_offset_reset = 'earliest' ,
663
- consumer_timeout_ms = 5000 ,
664
- fetch_max_bytes = 1 )
640
+ consumer = kafka_consumer_factory (auto_offset_reset = 'earliest' , fetch_max_bytes = 1 )
641
+
642
+ messages = [next (consumer ) for i in range (10 )]
643
+ assert_message_count (messages , 10 )
665
644
666
- fetched_msgs = [next (consumer ) for i in range (10 )]
667
- assert_message_count (fetched_msgs , 10 )
645
+ # Check that we fetched at least 1 message from both partitions
646
+ seen_partitions = {(m .topic , m .partition ) for m in messages }
647
+ assert seen_partitions == {TopicPartition (topic , 0 ), TopicPartition (topic , 1 )}
668
648
669
649
670
650
@pytest .mark .skipif (env_kafka_version () < (0 , 10 , 1 ), reason = "Requires KAFKA_VERSION >= 0.10.1" )
0 commit comments