Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Introduce share consumer factories for Kafka Queues (Early Access) #3923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

sobychacko
Copy link
Contributor

  • Preliminary set of changes to support Kafka queueus introduced via KIP-932 (Kafka Queue) for early access in Apache Kafka 4.0.0. See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka

  • Add ShareConsumerFactory interface and DefaultShareConsumerFactory implementation as the initial building blocks for supporting Kafka Queues (KIP-932) in Spring for Apache Kafka 4.0.x. This factory and the implementation provide a flexible API for creating share consumers, and are designed as the foundation for further queue integration.

  • Tests to verify the share consumer behavior

Related to #3875

#3875

- Preliminary set of changes to support Kafka queueus introduced via KIP-932 (Kafka Queue)
  for early access in Apache Kafka 4.0.0.
  See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka

- Add ShareConsumerFactory interface and DefaultShareConsumerFactory implementation as the
  initial building blocks for supporting Kafka Queues (KIP-932) in Spring for Apache Kafka 4.0.x.
  This factory and the implementation provide a flexible API for creating share consumers,
  and are designed as the foundation for further queue integration.

- Tests to verify the share consumer behavior

Related to spring-projects#3875

spring-projects#3875

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
@sobychacko sobychacko added this to the 4.0.0-M3 milestone May 23, 2025
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool!
Just a couple nit-picks.
Thanks

* @return the share consumer.
*/
ShareConsumer<K, V> createShareConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
@Nullable String clientIdSuffix);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have some explanation in JavaDocs why we just cannot use clientId?
I know this is something existing from the ConsumerFactory, but still not clear from there why append or override with prefix and suffix concatenation instead of just simple clientId parameter.
Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I removed prefix and suffix usage from this PR and resorted to clientId in this iteration. I will evaluate the significance of a flexible client.id for the queue feature and implement it later.

* Return whether deserializers are configured automatically.
* @return true if deserializers are configured automatically
*/
public boolean isConfigureDeserializers() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this method to be exposed?

topics = {"embedded-share-test"}, partitions = 1,
brokerProperties = {
"unstable.api.versions.enable=true",
"group.coordinator.rebalance.protocols=classic,consumer,share",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to enable all the protocols if we talk in this test suite only about share?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, since share is an early access feature, kafka brokers also require the classic value. Without it, the test fails. Once the feature is fully ready, maybe they will lift that requirement.

"share.coordinator.state.topic.min.isr=1"
})
@SpringJUnitConfig
@DirtiesContext
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need Spring environment if there is no @Configuration for this test suite?
I even surprised that it works at all without @Configuration 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without it, the autowiring of EmbeddedKafka fails. Feel free to fix it on merge if there is a clever way to autoconfigure embeddedKafka that I am missing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We mention that in docs: https://docs.spring.io/spring-kafka/reference/testing.html#embedded-kafka-junit5.


    @Test
    public void test(EmbeddedKafkaBroker broker) {
        String brokerList = broker.getBrokersAsString();
        ...
    }

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
* </p>
* @param index the index (list position).
* @param listener the listener to add.
* @since 2.5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we don't need this @since since the whole class is new in this 4.0 version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy/paste error :)

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
// For this test: force new share groups to start from the beginning of the topic.
// This is NOT the same as the usual consumer auto.offset.reset; it's a group config,
// so use AdminClient to set share.auto.offset.reset = earliest for our test group.
try (AdminClient ignored = AdminClient.create(adminProperties)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a warning on this:

 /home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java:167: warning: [try] auto-closeable resource ignored is never referenced in body of corresponding try statement
		try (AdminClient ignored = AdminClient.create(adminProperties)) {
		                 ^

I guess, has to be marked with @SuppressWarnings("try")

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
@artembilan artembilan merged commit 17fedf6 into spring-projects:main May 28, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
Morty Proxy This is a proxified and sanitized view of the page, visit original site.