-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Introduce share consumer factories for Kafka Queues (Early Access) #3923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce share consumer factories for Kafka Queues (Early Access) #3923
Conversation
- Preliminary set of changes to support Kafka queueus introduced via KIP-932 (Kafka Queue) for early access in Apache Kafka 4.0.0. See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka - Add ShareConsumerFactory interface and DefaultShareConsumerFactory implementation as the initial building blocks for supporting Kafka Queues (KIP-932) in Spring for Apache Kafka 4.0.x. This factory and the implementation provide a flexible API for creating share consumers, and are designed as the foundation for further queue integration. - Tests to verify the share consumer behavior Related to spring-projects#3875 spring-projects#3875 Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool!
Just a couple nit-picks.
Thanks
* @return the share consumer. | ||
*/ | ||
ShareConsumer<K, V> createShareConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, | ||
@Nullable String clientIdSuffix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have some explanation in JavaDocs why we just cannot use clientId
?
I know this is something existing from the ConsumerFactory
, but still not clear from there why append
or override
with prefix and suffix concatenation instead of just simple clientId
parameter.
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I removed prefix and suffix usage from this PR and resorted to clientId
in this iteration. I will evaluate the significance of a flexible client.id
for the queue feature and implement it later.
* Return whether deserializers are configured automatically. | ||
* @return true if deserializers are configured automatically | ||
*/ | ||
public boolean isConfigureDeserializers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this method to be exposed?
spring-kafka/src/main/java/org/springframework/kafka/core/DefaultShareConsumerFactory.java
Show resolved
Hide resolved
topics = {"embedded-share-test"}, partitions = 1, | ||
brokerProperties = { | ||
"unstable.api.versions.enable=true", | ||
"group.coordinator.rebalance.protocols=classic,consumer,share", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to enable all the protocols if we talk in this test suite only about share
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, since share
is an early access feature, kafka brokers also require the classic
value. Without it, the test fails. Once the feature is fully ready, maybe they will lift that requirement.
"share.coordinator.state.topic.min.isr=1" | ||
}) | ||
@SpringJUnitConfig | ||
@DirtiesContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need Spring environment if there is no @Configuration
for this test suite?
I even surprised that it works at all without @Configuration
😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without it, the autowiring of EmbeddedKafka
fails. Feel free to fix it on merge if there is a clever way to autoconfigure embeddedKafka that I am missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We mention that in docs: https://docs.spring.io/spring-kafka/reference/testing.html#embedded-kafka-junit5.
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java
Outdated
Show resolved
Hide resolved
* </p> | ||
* @param index the index (list position). | ||
* @param listener the listener to add. | ||
* @since 2.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we don't need this @since
since the whole class is new in this 4.0
version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy/paste error :)
// For this test: force new share groups to start from the beginning of the topic. | ||
// This is NOT the same as the usual consumer auto.offset.reset; it's a group config, | ||
// so use AdminClient to set share.auto.offset.reset = earliest for our test group. | ||
try (AdminClient ignored = AdminClient.create(adminProperties)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a warning on this:
/home/runner/work/spring-kafka/spring-kafka/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java:167: warning: [try] auto-closeable resource ignored is never referenced in body of corresponding try statement
try (AdminClient ignored = AdminClient.create(adminProperties)) {
^
I guess, has to be marked with @SuppressWarnings("try")
Preliminary set of changes to support Kafka queueus introduced via KIP-932 (Kafka Queue) for early access in Apache Kafka 4.0.0. See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
Add ShareConsumerFactory interface and DefaultShareConsumerFactory implementation as the initial building blocks for supporting Kafka Queues (KIP-932) in Spring for Apache Kafka 4.0.x. This factory and the implementation provide a flexible API for creating share consumers, and are designed as the foundation for further queue integration.
Tests to verify the share consumer behavior
Related to #3875
#3875