Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e5683ed

Browse filesBrowse files
committed
Merge two tests that are very similar
Previously the `test_kafka_consumer_max_bytes_simple()` was seeing occasional test failures because it was doing only 10 iterations. And much of the purpose of it was gutted when Kafka 0.11 came out and changed the behavior. So this merges the two tests into one which should be relatively straightforward. Further discussion in https://github.com/dpkp/kafka-python/pull/1886/files#r316860737
1 parent 61fa0b2 commit e5683ed
Copy full SHA for e5683ed

File tree

1 file changed

+15
-35
lines changed
Filter options

1 file changed

+15
-35
lines changed

‎test/test_consumer_integration.py

Copy file name to clipboardExpand all lines: test/test_consumer_integration.py
+15-35Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -623,48 +623,28 @@ def test_kafka_consumer__offset_commit_resume(kafka_consumer_factory, send_messa
623623

624624

625625
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
626-
def test_kafka_consumer_max_bytes_simple(kafka_consumer_factory, topic, send_messages):
627-
send_messages(range(100, 200), partition=0)
628-
send_messages(range(200, 300), partition=1)
629-
630-
# Start a consumer
631-
consumer = kafka_consumer_factory(
632-
auto_offset_reset='earliest', fetch_max_bytes=300)
633-
seen_partitions = set()
634-
for i in range(90):
635-
poll_res = consumer.poll(timeout_ms=100)
636-
for partition, msgs in poll_res.items():
637-
for msg in msgs:
638-
seen_partitions.add(partition)
626+
def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, topic, send_messages):
627+
"""Check that messages larger than fetch_max_bytes are still received.
639628
640-
# Check that we fetched at least 1 message from both partitions
641-
assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)}
642-
643-
644-
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
645-
def test_kafka_consumer_max_bytes_one_msg(kafka_consumer_factory, send_messages):
646-
# We send to only 1 partition so we don't have parallel requests to 2
647-
# nodes for data.
648-
send_messages(range(100, 200))
629+
We are checking for both partition starvation and messages simply not being
630+
received. The broker should reply with them, just making sure the consumer
631+
isn't doing anything unexpected client-side that blocks them.
632+
"""
633+
send_messages(range(0, 100), partition=0)
634+
send_messages(range(100, 200), partition=1)
649635

650636
# Start a consumer. FetchResponse_v3 should always include at least 1
651637
# full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time
652638
# But 0.11.0.0 returns 1 MessageSet at a time when the messages are
653639
# stored in the new v2 format by the broker.
654-
#
655-
# DP Note: This is a strange test. The consumer shouldn't care
656-
# how many messages are included in a FetchResponse, as long as it is
657-
# non-zero. I would not mind if we deleted this test. It caused
658-
# a minor headache when testing 0.11.0.0.
659-
group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5)
660-
consumer = kafka_consumer_factory(
661-
group_id=group,
662-
auto_offset_reset='earliest',
663-
consumer_timeout_ms=5000,
664-
fetch_max_bytes=1)
640+
consumer = kafka_consumer_factory(auto_offset_reset='earliest', fetch_max_bytes=1)
641+
642+
messages = [next(consumer) for i in range(10)]
643+
assert_message_count(messages, 10)
665644

666-
fetched_msgs = [next(consumer) for i in range(10)]
667-
assert_message_count(fetched_msgs, 10)
645+
# Check that we fetched at least 1 message from both partitions
646+
seen_partitions = {(m.topic, m.partition) for m in messages}
647+
assert seen_partitions == {TopicPartition(topic, 0), TopicPartition(topic, 1)}
668648

669649

670650
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.