Description
Issue:
We observer random offset reset during boot and rebalancing events. Consumer starts to consume messages according to auto offset reset - in our case earlies. That cause huge reconsumption of messages and increased lag - all messages from partition. All that is random so hard to reproduce anyhow - but i could provide some extra logs if needed. For now we use pool based consumption, but that was also present using iterator. In both cases we are commiting offsets manually.
log of ofset resset:
Resetting offset for partition TopicPartition(topic='pwell_dev_us-west-2_aidp-chat_completion-result', partition=160) to offset 0.
In order to investigate, i have created a code that observe messages offsets and queries broker for the last commited ones (that log happens just after the reset)
Offset split detected! topic_partition:TopicPartition(topic='pwell_dev_us-west-2_aidp-chat_completion-result', partition=160) offset:7334 msg.offset:0
Here the consumer started consuming msg with offset 0 while last commited offset for topic partition that contain that message is 7334
For now i have a code that keeps track of last commited offsets and update those when needed from kafka.broker.
- Stack
- kafka-pytho: 2.2.7
- python 3.12
- against local dockerized kafka and also agains confluent cloud
- multiple consumers in separate threads - not sharing consumer instances, every thread has a separate consumer
- 200 partitions on the topic - but that was also randomly happening when we had 6 partitions - now it is happening more often
- up to 50 listener threads - we do not observe any perf issues as our work is IO bound
- consumer config:
Creating Kafka consumer for topic: pwell_dev_us-west-2_aidp-chat_completion-result with config: {'client.id': 'aidp-sdk', 'bootstrap.servers': '', 'sasl.plain.username': '', 'sasl.plain.password': '', 'enable.auto.commit': False, 'partition.assignment.strategy': [<class 'kafka.coordinator.assignors.sticky.sticky_assignor.StickyPartitionAssignor'>], 'enable.incremental.fetch.sessions': False, 'api.version.auto.timeout.ms': 60000} and extra config: {'group.id': 'aidp-chat_completion-result-consumer', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'key.deserializer': <function CallbackContainer.<lambda> at 0x7f5367a41c60>, 'value.deserializer': <function CallbackContainer.<lambda> at 0x7f5367a41f80>, 'max.poll.records': 150, 'max.poll.interval.ms': 30000000, 'max.partition.fetch.bytes': 100}
As that seems to be a race, it is hard for me to provide a reproduction case, but mainly having many partitions and many listeners locally causes that.