Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

assign() does NOT work inside on_assign-Callback if assignor is configured/seek() does NOT work inside on_assign-Callback #1669

Copy link
Copy link
Open
@Kogl1n

Description

@Kogl1n
Issue body actions

Hello!
My usecase is consuming a certain timeframe from a single topic with multiple consumers in a meaningful manner in case of fault (cooperative sticky assignor). But for now I am using just one consumer with one partition.
So I use offsets_for_times to get offsets for the start timestamps and try to seek them.

@edenhill commented in #373 that one shouldn't use (poll and) seek after subscribe since subscribe is asyncronous. One should use the following instead:

    def on_assign(consumer, partitions):
        for p in partitions:
             # some starting offset, or use OFFSET_BEGINNING, et, al.
             # the default offset is STORED which means use committed offsets, and if
             # no committed offsets are available use auto.offset.reset config (default latest)
            p.offset = 1234
        # call assign() to start fetching the given partitions.
        consumer.assign(partitions)

     consumer.subscribe(mytopics, on_assign=on_assign)

This DOES NOT work if you configured an assignor: conf['partition.assignment.strategy']='cooperative-sticky', though:
cimpl.KafkaException: KafkaError{code=_STATE,val=-172,str="Failed to set assignment: Local : Erroneous state"}

That seems reasonable since an assignor assigns the partitions but the following version with seek DOES NOT work (neither with or without an assignor):

    def on_assign(c, partitions):
        partition_to_timestamp_mapping = {topic_partition.partition: int(START_DT.timestamp()*1000) for topic_partition in partitions}
        topic_partitions_with_new_offsets = c.offsets_for_times([TopicPartition(TOPICS[0], partition, partition_to_timestamp_mapping[partition]) for partition in partition_to_timestamp_mapping.keys()])
        for topic_partition in partitions:
              c.seek(topic_partition)

(I read in an issue that calling assign is not necessary. The same code in the way it is not recommended works flawlessly.)

This doesn't work: cimpl.KafkaException: KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="Failed to seek to offset 1524: Local: Unknown partition"}

Even a minimum example passing in the provided partitions to seek fails:

for topic_partition in [TopicPartition(TOPICS[0], partition.partition, OFFSET_BEGINNING) for partition in partitions]:
    c.seek(topic_partition )

I am using up to date versions of the Python library and Kafka images:

  • confluent-kafka-python version: 2.3.0
  • Apache Kafka broker version: 7.5.1-1-ubi8 via Docker Compose

Any help is very appreciated! Thank you!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Morty Proxy This is a proxified and sanitized view of the page, visit original site.