Description
Description
A bit of a context question: what is the most optimal consumption pattern if I have more than one topics, and possibly multiple partitions per topic to be handled by a single application instance? I feel like doing this is an anti-pattern:
...
consumer = Consumer(config)
consumer.subscribe(["topic1", "topic2"])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
elif msg.error():
print('error: {}'.format(msg.error()))
elif msg.topic() == "topic1":
# topic1 logic
elif msg.topic() == "topic2":
# topic2 logic
...
Let's assume that the topic1 logic is a lightweight filter/repartition (discard most of the stream, rehash a new key and publish to topic2) and the topic2 logic is an IO. Seems, counterproductive, It's like maintaining a single queue to a grocery and a pharmacy. Now, how to optimize this?
I have read this mind-blowing blog post https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
I figure that the key-level parallelism is like a holy grail, not available in the Python world as of now. But first things first: a good enough step would be to shuffle each topic to a separate Consumer
instance, hopefully with asyncio
rather than multiprocessing
.
I have read though related issues: #185 and #100
and the famous blog post https://www.confluent.io/blog/kafka-python-asyncio-integration/
and several other resources I cannot comprehend now in my gazillion open browser tabs 😆
I have come up with a snippet of code that I am kindly requesting to review. This generally works in a local environment, I wonder what you think. Does this approach make sense or is it a disaster awaiting the moment I put some serious load there. Thanks in advance.
import asyncio
import functools
import logging
import signal
import sys
import confluent_kafka
signal.signal(signal.SIGTERM, lambda *args: sys.exit())
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
config = {
"group.id": "consumer-group-name",
"bootstrap.servers": "localhost:9092",
}
async def consume(config, topic):
consumer = confluent_kafka.Consumer(config)
consumer.subscribe([topic])
loop = asyncio.get_running_loop()
poll = functools.partial(consumer.poll, 0.1)
try:
log.info(f"Starting consumer: {topic}")
while True:
message = await loop.run_in_executor(None, poll)
if message is None:
continue
if message.error():
log.error("Consumer error: {}".format(msg.error()))
continue
# TODO: Inject topic-specific logic here
log.info(f"Consuming message: {message.value()}")
finally:
log.info(f"Closing consumer: {topic}")
consumer.close()
consume = functools.partial(consume, config)
async def main():
await asyncio.gather(
consume("topic1"),
consume("topic2"),
)
if __name__ == "__main__":
try:
asyncio.run(main())
except (KeyboardInterrupt, SystemExit):
log.info("Application shutdown complete")
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
):('1.5.0', 17104896)
and('1.5.0', 17105151)
, respectively - Apache Kafka broker version: confluentinc/cp-kafka:6.0.0 locally and Confluent Cloud clusters on production
- Client configuration: very minimal for starters, available in the snippet
- Operating system: Python 3.8.5 on MacOS, but production apps usually run on Buster images
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue