Я работаю над образцом реактивного приложения kafka, которое будет читать из нескольких разделов (в моем случае 5 раздела) темы Kafka, одновременно обрабатывая записи, которые должны быть заказаны разделами и впоследствии публикуют их в другую тему. Я взял ссылку по этой ссылке, чтобы написать код. @Bean
Map kafkaConsumerConfiguration() {
Map configuration = new HashMap();
configuration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configuration.put(ConsumerConfig.GROUP_ID_CONFIG, "sampleGroupId");
configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configuration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configuration;
}
@Bean
ReceiverOptions kafkaReceiverOptions(@Value("${kafka.topic.in}") String inTopicName) {
ReceiverOptions options = ReceiverOptions.create(kafkaConsumerConfiguration());
return options.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.subscription(Collections.singletonList(inTopicName));
}
@Bean
KafkaReceiver reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) {
return KafkaReceiver.create(kafkaReceiverOptions);
}
@EventListener(ApplicationStartedEvent.class)
public void onMessage() {
reactiveKafkaReceiver
.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000))
.concatMap(offset -> offset.commit()))
.subscribe();
}
< /code>
При запуске приложения, из журналов я заметил, что планировщик создал 5 потоков по одному для каждого раздела. Каждый поток отвечает за потребление события из этого раздела.
Проблема я здесь сталкиваюсь здесь, что все 5 потоков не работают одновременно и, следовательно, неспособны потреблять из всех частиц параллельно, даже если каждый из разделов имеет 1000 рекордов, которые будут обработаны. /> может кто -нибудь, пожалуйста, помогите мне с тем, что мне здесь не хватает? Или как мы можем прочитать со всех разделов параллельно, а также позаботиться о заказе в разделе.
Подробнее здесь: https://stackoverflow.com/questions/748 ... isnt-worki
Одновременная обработка с упорядочением на основе разделов в реакторе Кафка не работает, как и ожидалось ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Запись видео и одновременная обработка кадров в Android с помощью CameraX
Гость » » в форуме Android - 0 Ответы
- 46 Просмотры
-
Последнее сообщение Гость
-
-
-
Неожиданные типы в Pycharm меньше, чем функция с классом данных и общим упорядочением
Anonymous » » в форуме Python - 0 Ответы
- 14 Просмотры
-
Последнее сообщение Anonymous
-