Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

nomisRev/kotlin-kafka

Repository files navigation

Module kotlin-kafka

Maven Central

Rationale

At the time of starting this repository I didn't find any bindings between Kafka SDK and Kotlin suspension, or KotlinX Coroutines Flow. These operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised runtimes.

Some important aspects of Kafka are tricky to implement with the "low-level" Kafka API, especially properly streaming records from Kafka and correctly committing them. Additional complexity is involved in this process, more details here.

To solve these problems a couple of projects in the JVM already exist:

There was no implementation for KotlinX Coroutines Flow, you can however quite easily use reactor-kafka with KotlinX Coroutines Reactor bindings.

This project implements the same strategies as [reactor-kafka] directly on top of KotlinX Coroutines to benefit from all their benefits, and to open the door to potentially becoming a Kotlin MPP library in the future.

Goals

  • Lean Core library built on top of Kotlin Std & KotlinX Coroutines
  • Extensions to easily operate over the Kafka SDK with KotlinX Coroutines and suspend.
  • Flow based operators, so you can easily compose KotlinX Flow based Kafka programs
  • Strong guarantees about committing record offsets, and performance optimisations in regard to re-balancing/partitioning.
  • example for testing Kafka with Test Containers in Kotlin.

Adding Dependency

Simply add the following dependency as implementation in the build.gradle dependencies` block.

dependencies {
  implementation("io.github.nomisrev:kotlin-kafka:0.3.0")
}

Example

@JvmInline
value class Key(val index: Int)

@JvmInline
value class Message(val content: String)

fun main(): Unit = SuspendApp {
  val topicName = "test-topic"
  val msgCount = 10
  val kafka = Kafka.container

  Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
    client.createTopic(NewTopic(topicName, 1, 1))
  }

  launch(Dispatchers.IO) { // Send 20 messages, and then close the producer
    val settings: PublisherSettings<Key, Message> = PublisherSettings(
      kafka.bootstrapServers,
      IntegerSerializer().imap { key: Key -> key.index },
      StringSerializer().imap { msg: Message -> msg.content },
      Acks.All
    )
    KafkaPublisher(settings).use { publisher ->
      publisher.publishScope {
        (1..msgCount).forEach { index ->
          offer(ProducerRecord(topicName, Key(index), Message("msg: $index")))
        }
      }
    }
  }

  launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
    val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
      kafka.bootstrapServers,
      IntegerDeserializer().map(::Key),
      StringDeserializer().map(::Message),
      groupId = UUID.randomUUID().toString(),
      autoOffsetReset = AutoOffsetReset.Earliest
    )
    KafkaReceiver(settings)
      .receive(topicName)
      .take(msgCount)
      .map { "${it.key()} -> ${it.value()}" }
      .collect(::println)
  }
}

You can get the full code here.

Key(index=1) -> Message(content=msg: 1)
Key(index=2) -> Message(content=msg: 2)
Key(index=3) -> Message(content=msg: 3)
Key(index=4) -> Message(content=msg: 4)
Key(index=5) -> Message(content=msg: 5)
Key(index=6) -> Message(content=msg: 6)
Key(index=7) -> Message(content=msg: 7)
Key(index=8) -> Message(content=msg: 8)
Key(index=9) -> Message(content=msg: 9)
Key(index=10) -> Message(content=msg: 10)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.