Apache Kafka ровно один раз не работаетJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Apache Kafka ровно один раз не работает

Сообщение Anonymous »

Я реализую реализацию «точно один раз», используя потребителя и производителя с Java-клиентами Apache Kafka, но мне не удается заставить ее работать.
Ниже приведен минимальный реализация, которая имитирует остановку обработки пакета на полпути несколько раз перед завершением.

Код: Выделить всё

package com.topictale.base.kafka.examples

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import java.time.Duration
import java.util.*
import kotlin.concurrent.thread

private const val TOPIC_SOURCE = "source-topic"
private val TOPIC_SINK = "source-topic.${UUID.randomUUID()}"
private val GROUP_ID = UUID.randomUUID().toString()
private val TRANSACTIONAL_ID = UUID.randomUUID().toString()
var done = false
var shouldRun = false

fun main() {
val sourceCount = kafkaConsumer().countMessagesInTopic(TOPIC_SOURCE)

while (!done) {
shouldRun = true
val executor = thread { exactlyOnceConsumerToProducer() }

Thread.sleep(3_000)

shouldRun = false
println("Stopping")

executor.join()
println("Stopped")

println("Source count: $sourceCount, sink count: ${kafkaConsumer().countMessagesInTopic(TOPIC_SINK)}")
}
val sinkCount = kafkaConsumer().countMessagesInTopic(TOPIC_SINK)
println("Source count: $sourceCount, sink count: ${kafkaConsumer().countMessagesInTopic(TOPIC_SINK)}")
assert(sourceCount == sinkCount)
}

fun exactlyOnceConsumerToProducer() {
val consumer = kafkaConsumer()
val producer = kafkaProducer()

try {

consumer.subscribe(listOf(TOPIC_SOURCE))
producer.initTransactions()

while (shouldRun && !done) {
val records = consumer.poll(Duration.ofSeconds(1))
if (records.isEmpty) {
done = true
continue
}
producer.beginTransaction()

try {

for (record in records) {
producer.send(ProducerRecord(TOPIC_SINK, record.key(), record.value()))
if (!shouldRun) {
print("Interrupted on record processing")
producer.abortTransaction()
return
}
}
val transactionOffsets = records.partitions()
.associateWith { OffsetAndMetadata(records.records(it).last().offset() + 1) }
producer.sendOffsetsToTransaction(transactionOffsets, consumer.groupMetadata())

producer.commitTransaction()
} catch (e: Exception) {
producer.abortTransaction()
throw e
}
}

} finally {
println("Closing")
consumer.close()
producer.close()
}
}

private fun kafkaConsumer(): KafkaConsumer {
val consumerProperties = Properties().apply {
put("bootstrap.servers", "localhost:9092")
put("group.id", GROUP_ID)
put("max.poll.records", 1000)
put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

put("auto.offset.reset", "earliest")

put("enable.auto.commit", "false")
put("isolation.level", "read_committed")
}
val consumer = KafkaConsumer(consumerProperties)
return consumer
}

private fun kafkaProducer(): KafkaProducer {
val producerConfig = Properties().apply {
put("bootstrap.servers", "localhost:9092")
put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTIONAL_ID)
}
val producer = KafkaProducer(producerConfig)
return producer
}

fun   KafkaConsumer.countMessagesInTopic(
topicPattern: String
): Long {
val partitions = topicPartitionsFor(topicPattern)

val beginningOffsets = beginningOffsets(partitions)
val endOffsets = endOffsets(partitions)

return partitions.sumOf { partition ->
val beginOffset = beginningOffsets[partition] ?: 0L
val endOffset = endOffsets[partition] ?: 0L
endOffset - beginOffset
}
}

fun  KafkaConsumer.topicPartitionsFor(topic: String): List =
this.partitionsFor(topic).map { TopicPartition(it.topic(), it.partition()) }

Вывод этого исполняемого файла kotlin:

Код: Выделить всё

Stopping
Closing
Stopped
Source count: 300000, sink count: 67067
Stopping
Closing
Stopped
Source count: 300000, sink count: 141141
Stopping
Closing
Stopped
Source count: 300000, sink count: 221221
Stopping
Closing
Stopped
Source count: 300000, sink count: 300301
Source count: 300000, sink count: 300301
Исходная тема содержит 300 000 записей, и вы можете видеть, как я получаю 301 дополнительную запись, что показывает, что ровно один раз не работает.
Любая идеи?

Подробнее здесь: https://stackoverflow.com/questions/790 ... ot-working
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»