Ниже приведен минимальный реализация, которая имитирует остановку обработки пакета на полпути несколько раз перед завершением.
Код: Выделить всё
package com.topictale.base.kafka.examples
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import java.time.Duration
import java.util.*
import kotlin.concurrent.thread
private const val TOPIC_SOURCE = "source-topic"
private val TOPIC_SINK = "source-topic.${UUID.randomUUID()}"
private val GROUP_ID = UUID.randomUUID().toString()
private val TRANSACTIONAL_ID = UUID.randomUUID().toString()
var done = false
var shouldRun = false
fun main() {
val sourceCount = kafkaConsumer().countMessagesInTopic(TOPIC_SOURCE)
while (!done) {
shouldRun = true
val executor = thread { exactlyOnceConsumerToProducer() }
Thread.sleep(3_000)
shouldRun = false
println("Stopping")
executor.join()
println("Stopped")
println("Source count: $sourceCount, sink count: ${kafkaConsumer().countMessagesInTopic(TOPIC_SINK)}")
}
val sinkCount = kafkaConsumer().countMessagesInTopic(TOPIC_SINK)
println("Source count: $sourceCount, sink count: ${kafkaConsumer().countMessagesInTopic(TOPIC_SINK)}")
assert(sourceCount == sinkCount)
}
fun exactlyOnceConsumerToProducer() {
val consumer = kafkaConsumer()
val producer = kafkaProducer()
try {
consumer.subscribe(listOf(TOPIC_SOURCE))
producer.initTransactions()
while (shouldRun && !done) {
val records = consumer.poll(Duration.ofSeconds(1))
if (records.isEmpty) {
done = true
continue
}
producer.beginTransaction()
try {
for (record in records) {
producer.send(ProducerRecord(TOPIC_SINK, record.key(), record.value()))
if (!shouldRun) {
print("Interrupted on record processing")
producer.abortTransaction()
return
}
}
val transactionOffsets = records.partitions()
.associateWith { OffsetAndMetadata(records.records(it).last().offset() + 1) }
producer.sendOffsetsToTransaction(transactionOffsets, consumer.groupMetadata())
producer.commitTransaction()
} catch (e: Exception) {
producer.abortTransaction()
throw e
}
}
} finally {
println("Closing")
consumer.close()
producer.close()
}
}
private fun kafkaConsumer(): KafkaConsumer {
val consumerProperties = Properties().apply {
put("bootstrap.servers", "localhost:9092")
put("group.id", GROUP_ID)
put("max.poll.records", 1000)
put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
put("auto.offset.reset", "earliest")
put("enable.auto.commit", "false")
put("isolation.level", "read_committed")
}
val consumer = KafkaConsumer(consumerProperties)
return consumer
}
private fun kafkaProducer(): KafkaProducer {
val producerConfig = Properties().apply {
put("bootstrap.servers", "localhost:9092")
put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTIONAL_ID)
}
val producer = KafkaProducer(producerConfig)
return producer
}
fun KafkaConsumer.countMessagesInTopic(
topicPattern: String
): Long {
val partitions = topicPartitionsFor(topicPattern)
val beginningOffsets = beginningOffsets(partitions)
val endOffsets = endOffsets(partitions)
return partitions.sumOf { partition ->
val beginOffset = beginningOffsets[partition] ?: 0L
val endOffset = endOffsets[partition] ?: 0L
endOffset - beginOffset
}
}
fun KafkaConsumer.topicPartitionsFor(topic: String): List =
this.partitionsFor(topic).map { TopicPartition(it.topic(), it.partition()) }
Код: Выделить всё
Stopping
Closing
Stopped
Source count: 300000, sink count: 67067
Stopping
Closing
Stopped
Source count: 300000, sink count: 141141
Stopping
Closing
Stopped
Source count: 300000, sink count: 221221
Stopping
Closing
Stopped
Source count: 300000, sink count: 300301
Source count: 300000, sink count: 300301
Любая идеи?
Подробнее здесь: https://stackoverflow.com/questions/790 ... ot-working
Мобильная версия