Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Seems consumer is not fetching offsets correctly on start and during rebalance #2628

Copy link
Copy link
Closed
@roman-bartusiak-yohana

Description

@roman-bartusiak-yohana
Issue body actions

Issue:

We observer random offset reset during boot and rebalancing events. Consumer starts to consume messages according to auto offset reset - in our case earlies. That cause huge reconsumption of messages and increased lag - all messages from partition. All that is random so hard to reproduce anyhow - but i could provide some extra logs if needed. For now we use pool based consumption, but that was also present using iterator. In both cases we are commiting offsets manually.

log of ofset resset:

Resetting offset for partition TopicPartition(topic='pwell_dev_us-west-2_aidp-chat_completion-result', partition=160) to offset 0.

In order to investigate, i have created a code that observe messages offsets and queries broker for the last commited ones (that log happens just after the reset)

Offset split detected! topic_partition:TopicPartition(topic='pwell_dev_us-west-2_aidp-chat_completion-result', partition=160) offset:7334 msg.offset:0

Here the consumer started consuming msg with offset 0 while last commited offset for topic partition that contain that message is 7334

For now i have a code that keeps track of last commited offsets and update those when needed from kafka.broker.

  1. Stack
    • kafka-pytho: 2.2.7
    • python 3.12
    • against local dockerized kafka and also agains confluent cloud
    • multiple consumers in separate threads - not sharing consumer instances, every thread has a separate consumer
    • 200 partitions on the topic - but that was also randomly happening when we had 6 partitions - now it is happening more often
    • up to 50 listener threads - we do not observe any perf issues as our work is IO bound
    • consumer config:
      Creating Kafka consumer for topic: pwell_dev_us-west-2_aidp-chat_completion-result with config: {'client.id': 'aidp-sdk', 'bootstrap.servers': '', 'sasl.plain.username': '', 'sasl.plain.password': '', 'enable.auto.commit': False, 'partition.assignment.strategy': [<class 'kafka.coordinator.assignors.sticky.sticky_assignor.StickyPartitionAssignor'>], 'enable.incremental.fetch.sessions': False, 'api.version.auto.timeout.ms': 60000} and extra config: {'group.id': 'aidp-chat_completion-result-consumer', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'key.deserializer': <function CallbackContainer.<lambda> at 0x7f5367a41c60>, 'value.deserializer': <function CallbackContainer.<lambda> at 0x7f5367a41f80>, 'max.poll.records': 150, 'max.poll.interval.ms': 30000000, 'max.partition.fetch.bytes': 100}

As that seems to be a race, it is hard for me to provide a reproduction case, but mainly having many partitions and many listeners locally causes that.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Morty Proxy This is a proxified and sanitized view of the page, visit original site.