Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 98c0058

Browse filesBrowse files
authored
Cleanup handling of KAFKA_VERSION env var in tests (#1887)
Now that we are using `pytest`, there is no need for a custom decorator because we can use `pytest.mark.skipif()`. This makes the code significantly simpler. In particular, dropping the custom `@kafka_versions()` decorator is necessary because it uses `func.wraps()` which doesn't play nice with `pytest` fixtures: - pytest-dev/pytest#677 - https://stackoverflow.com/a/19614807/770425 So this is a pre-requisite to migrating some of those tests to using pytest fixtures.
1 parent e49caeb commit 98c0058
Copy full SHA for 98c0058
Expand file treeCollapse file tree

10 files changed

+65
-142
lines changed

‎test/conftest.py

Copy file name to clipboardExpand all lines: test/conftest.py
+4-10Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,8 @@
22

33
import pytest
44

5-
from test.fixtures import KafkaFixture, ZookeeperFixture, random_string, version as kafka_version
6-
7-
8-
@pytest.fixture(scope="module")
9-
def version():
10-
"""Return the Kafka version set in the OS environment"""
11-
return kafka_version()
12-
5+
from test.testutil import env_kafka_version, random_string
6+
from test.fixtures import KafkaFixture, ZookeeperFixture
137

148
@pytest.fixture(scope="module")
159
def zookeeper():
@@ -26,9 +20,9 @@ def kafka_broker(kafka_broker_factory):
2620

2721

2822
@pytest.fixture(scope="module")
29-
def kafka_broker_factory(version, zookeeper):
23+
def kafka_broker_factory(zookeeper):
3024
"""Return a Kafka broker fixture factory"""
31-
assert version, 'KAFKA_VERSION must be specified to run integration tests'
25+
assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests'
3226

3327
_brokers = []
3428
def factory(**broker_params):

‎test/fixtures.py

Copy file name to clipboardExpand all lines: test/fixtures.py
+3-22Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@
44
import logging
55
import os
66
import os.path
7-
import random
87
import socket
9-
import string
108
import subprocess
119
import time
1210
import uuid
@@ -19,29 +17,12 @@
1917
from kafka.client_async import KafkaClient
2018
from kafka.protocol.admin import CreateTopicsRequest
2119
from kafka.protocol.metadata import MetadataRequest
20+
from test.testutil import env_kafka_version, random_string
2221
from test.service import ExternalService, SpawnedService
2322

2423
log = logging.getLogger(__name__)
2524

2625

27-
def random_string(length):
28-
return "".join(random.choice(string.ascii_letters) for i in range(length))
29-
30-
31-
def version_str_to_tuple(version_str):
32-
"""Transform a version string into a tuple.
33-
34-
Example: '0.8.1.1' --> (0, 8, 1, 1)
35-
"""
36-
return tuple(map(int, version_str.split('.')))
37-
38-
39-
def version():
40-
if 'KAFKA_VERSION' not in os.environ:
41-
return ()
42-
return version_str_to_tuple(os.environ['KAFKA_VERSION'])
43-
44-
4526
def get_open_port():
4627
sock = socket.socket()
4728
sock.bind(("", 0))
@@ -477,7 +458,7 @@ def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_
477458
num_partitions == self.partitions and \
478459
replication_factor == self.replicas:
479460
self._send_request(MetadataRequest[0]([topic_name]))
480-
elif version() >= (0, 10, 1, 0):
461+
elif env_kafka_version() >= (0, 10, 1, 0):
481462
request = CreateTopicsRequest[0]([(topic_name, num_partitions,
482463
replication_factor, [], [])], timeout_ms)
483464
result = self._send_request(request, timeout=timeout_ms)
@@ -497,7 +478,7 @@ def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_
497478
'--replication-factor', self.replicas \
498479
if replication_factor is None \
499480
else replication_factor)
500-
if version() >= (0, 10):
481+
if env_kafka_version() >= (0, 10):
501482
args.append('--if-not-exists')
502483
env = self.kafka_run_class_env()
503484
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

‎test/test_client_integration.py

Copy file name to clipboardExpand all lines: test/test_client_integration.py
+4-2Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import os
22

3+
import pytest
4+
35
from kafka.errors import KafkaTimeoutError
46
from kafka.protocol import create_message
57
from kafka.structs import (
68
FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
79
ProduceRequestPayload)
810

911
from test.fixtures import ZookeeperFixture, KafkaFixture
10-
from test.testutil import KafkaIntegrationTestCase, kafka_versions
12+
from test.testutil import KafkaIntegrationTestCase, env_kafka_version
1113

1214

1315
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@@ -80,7 +82,7 @@ def test_send_produce_request_maintains_request_response_order(self):
8082
# Offset Tests #
8183
####################
8284

83-
@kafka_versions('>=0.8.1')
85+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
8486
def test_commit_fetch_offsets(self):
8587
req = OffsetCommitRequestPayload(self.topic, 0, 42, 'metadata')
8688
(resp,) = self.client.send_offset_commit_request('group', [req])

‎test/test_codec.py

Copy file name to clipboardExpand all lines: test/test_codec.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
lz4_encode_old_kafka, lz4_decode_old_kafka,
1515
)
1616

17-
from test.fixtures import random_string
17+
from test.testutil import random_string
1818

1919

2020
def test_gzip():

‎test/test_consumer_group.py

Copy file name to clipboardExpand all lines: test/test_consumer_group.py
+8-10Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@
1111
from kafka.coordinator.base import MemberState
1212
from kafka.structs import TopicPartition
1313

14-
from test.fixtures import random_string, version
14+
from test.testutil import env_kafka_version, random_string
1515

1616

1717
def get_connect_str(kafka_broker):
1818
return kafka_broker.host + ':' + str(kafka_broker.port)
1919

2020

21-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
22-
def test_consumer(kafka_broker, topic, version):
21+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
22+
def test_consumer(kafka_broker, topic):
2323
# The `topic` fixture is included because
2424
# 0.8.2 brokers need a topic to function well
2525
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
@@ -29,17 +29,16 @@ def test_consumer(kafka_broker, topic, version):
2929
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
3030
consumer.close()
3131

32-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
33-
def test_consumer_topics(kafka_broker, topic, version):
32+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
33+
def test_consumer_topics(kafka_broker, topic):
3434
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
3535
# Necessary to drive the IO
3636
consumer.poll(500)
3737
assert topic in consumer.topics()
3838
assert len(consumer.partitions_for_topic(topic)) > 0
3939
consumer.close()
4040

41-
@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
42-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
41+
@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
4342
def test_group(kafka_broker, topic):
4443
num_partitions = 4
4544
connect_str = get_connect_str(kafka_broker)
@@ -129,7 +128,7 @@ def consumer_thread(i):
129128
threads[c] = None
130129

131130

132-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
131+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
133132
def test_paused(kafka_broker, topic):
134133
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
135134
topics = [TopicPartition(topic, 1)]
@@ -148,8 +147,7 @@ def test_paused(kafka_broker, topic):
148147
consumer.close()
149148

150149

151-
@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
152-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
150+
@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
153151
def test_heartbeat_thread(kafka_broker, topic):
154152
group_id = 'test-group-' + random_string(6)
155153
consumer = KafkaConsumer(topic,

‎test/test_consumer_integration.py

Copy file name to clipboardExpand all lines: test/test_consumer_integration.py
+21-21Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
import logging
22
import os
33
import time
4-
from mock import patch
5-
import pytest
6-
import kafka.codec
74

5+
from mock import patch
86
import pytest
9-
from kafka.vendor.six.moves import range
107
from kafka.vendor import six
8+
from kafka.vendor.six.moves import range
119

1210
from . import unittest
1311
from kafka import (
1412
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
1513
create_gzip_message, KafkaProducer
1614
)
15+
import kafka.codec
1716
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
1817
from kafka.errors import (
1918
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
@@ -23,11 +22,11 @@
2322
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
2423
)
2524

26-
from test.fixtures import ZookeeperFixture, KafkaFixture, random_string, version
27-
from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer
25+
from test.fixtures import ZookeeperFixture, KafkaFixture
26+
from test.testutil import KafkaIntegrationTestCase, Timer, env_kafka_version, random_string
2827

2928

30-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
29+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
3130
def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
3231
"""Test KafkaConsumer"""
3332
kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
@@ -54,7 +53,7 @@ def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
5453
kafka_consumer.close()
5554

5655

57-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
56+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
5857
def test_kafka_consumer_unsupported_encoding(
5958
topic, kafka_producer_factory, kafka_consumer_factory):
6059
# Send a compressed message
@@ -211,7 +210,7 @@ def test_simple_consumer_no_reset(self):
211210
with self.assertRaises(OffsetOutOfRangeError):
212211
consumer.get_message()
213212

214-
@kafka_versions('>=0.8.1')
213+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
215214
def test_simple_consumer_load_initial_offsets(self):
216215
self.send_messages(0, range(0, 100))
217216
self.send_messages(1, range(100, 200))
@@ -388,7 +387,7 @@ def test_multi_proc_pending(self):
388387
consumer.stop()
389388

390389
@unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
391-
@kafka_versions('>=0.8.1')
390+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
392391
def test_multi_process_consumer_load_initial_offsets(self):
393392
self.send_messages(0, range(0, 10))
394393
self.send_messages(1, range(10, 20))
@@ -459,7 +458,7 @@ def test_huge_messages(self):
459458

460459
big_consumer.stop()
461460

462-
@kafka_versions('>=0.8.1')
461+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
463462
def test_offset_behavior__resuming_behavior(self):
464463
self.send_messages(0, range(0, 100))
465464
self.send_messages(1, range(100, 200))
@@ -491,7 +490,7 @@ def test_offset_behavior__resuming_behavior(self):
491490
consumer2.stop()
492491

493492
@unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
494-
@kafka_versions('>=0.8.1')
493+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
495494
def test_multi_process_offset_behavior__resuming_behavior(self):
496495
self.send_messages(0, range(0, 100))
497496
self.send_messages(1, range(100, 200))
@@ -548,6 +547,7 @@ def test_fetch_buffer_size(self):
548547
messages = [ message for message in consumer ]
549548
self.assertEqual(len(messages), 2)
550549

550+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
551551
def test_kafka_consumer__blocking(self):
552552
TIMEOUT_MS = 500
553553
consumer = self.kafka_consumer(auto_offset_reset='earliest',
@@ -586,7 +586,7 @@ def test_kafka_consumer__blocking(self):
586586
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
587587
consumer.close()
588588

589-
@kafka_versions('>=0.8.1')
589+
@pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1")
590590
def test_kafka_consumer__offset_commit_resume(self):
591591
GROUP_ID = random_string(10)
592592

@@ -605,7 +605,7 @@ def test_kafka_consumer__offset_commit_resume(self):
605605
output_msgs1 = []
606606
for _ in range(180):
607607
m = next(consumer1)
608-
output_msgs1.append(m)
608+
output_msgs1.append((m.key, m.value))
609609
self.assert_message_count(output_msgs1, 180)
610610
consumer1.close()
611611

@@ -621,12 +621,12 @@ def test_kafka_consumer__offset_commit_resume(self):
621621
output_msgs2 = []
622622
for _ in range(20):
623623
m = next(consumer2)
624-
output_msgs2.append(m)
624+
output_msgs2.append((m.key, m.value))
625625
self.assert_message_count(output_msgs2, 20)
626626
self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
627627
consumer2.close()
628628

629-
@kafka_versions('>=0.10.1')
629+
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
630630
def test_kafka_consumer_max_bytes_simple(self):
631631
self.send_messages(0, range(100, 200))
632632
self.send_messages(1, range(200, 300))
@@ -647,7 +647,7 @@ def test_kafka_consumer_max_bytes_simple(self):
647647
TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
648648
consumer.close()
649649

650-
@kafka_versions('>=0.10.1')
650+
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
651651
def test_kafka_consumer_max_bytes_one_msg(self):
652652
# We send to only 1 partition so we don't have parallel requests to 2
653653
# nodes for data.
@@ -673,7 +673,7 @@ def test_kafka_consumer_max_bytes_one_msg(self):
673673
self.assertEqual(len(fetched_msgs), 10)
674674
consumer.close()
675675

676-
@kafka_versions('>=0.10.1')
676+
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
677677
def test_kafka_consumer_offsets_for_time(self):
678678
late_time = int(time.time()) * 1000
679679
middle_time = late_time - 1000
@@ -727,7 +727,7 @@ def test_kafka_consumer_offsets_for_time(self):
727727
})
728728
consumer.close()
729729

730-
@kafka_versions('>=0.10.1')
730+
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
731731
def test_kafka_consumer_offsets_search_many_partitions(self):
732732
tp0 = TopicPartition(self.topic, 0)
733733
tp1 = TopicPartition(self.topic, 1)
@@ -766,15 +766,15 @@ def test_kafka_consumer_offsets_search_many_partitions(self):
766766
})
767767
consumer.close()
768768

769-
@kafka_versions('<0.10.1')
769+
@pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1")
770770
def test_kafka_consumer_offsets_for_time_old(self):
771771
consumer = self.kafka_consumer()
772772
tp = TopicPartition(self.topic, 0)
773773

774774
with self.assertRaises(UnsupportedVersionError):
775775
consumer.offsets_for_times({tp: int(time.time())})
776776

777-
@kafka_versions('>=0.10.1')
777+
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
778778
def test_kafka_consumer_offsets_for_times_errors(self):
779779
consumer = self.kafka_consumer(fetch_max_wait_ms=200,
780780
request_timeout_ms=500)

‎test/test_failover_integration.py

Copy file name to clipboardExpand all lines: test/test_failover_integration.py
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
from kafka.producer.base import Producer
1010
from kafka.structs import TopicPartition
1111

12-
from test.fixtures import ZookeeperFixture, KafkaFixture, random_string
13-
from test.testutil import KafkaIntegrationTestCase
12+
from test.fixtures import ZookeeperFixture, KafkaFixture
13+
from test.testutil import KafkaIntegrationTestCase, random_string
1414

1515

1616
log = logging.getLogger(__name__)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.