Одновременная обработка с упорядочением на основе разделов в реакторе Кафка не работает, как и ожидалосьJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Одновременная обработка с упорядочением на основе разделов в реакторе Кафка не работает, как и ожидалось

Сообщение Anonymous »

Я работаю над образцом реактивного приложения kafka, которое будет читать из нескольких разделов (в моем случае 5 раздела) темы Kafka, одновременно обрабатывая записи, которые должны быть заказаны разделами и впоследствии публикуют их в другую тему. Я взял ссылку по этой ссылке, чтобы написать код. @Bean
Map kafkaConsumerConfiguration() {
Map configuration = new HashMap();
configuration.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configuration.put(ConsumerConfig.GROUP_ID_CONFIG, "sampleGroupId");
configuration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configuration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configuration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configuration;
}

@Bean
ReceiverOptions kafkaReceiverOptions(@Value("${kafka.topic.in}") String inTopicName) {
ReceiverOptions options = ReceiverOptions.create(kafkaConsumerConfiguration());
return options.addAssignListener(assignments -> log.info("Assigned: " + assignments))
.subscription(Collections.singletonList(inTopicName));
}

@Bean
KafkaReceiver reactiveKafkaReceiver(ReceiverOptions kafkaReceiverOptions) {
return KafkaReceiver.create(kafkaReceiverOptions);
}

@EventListener(ApplicationStartedEvent.class)
public void onMessage() {
reactiveKafkaReceiver
.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.map(r -> processRecord(partitionFlux.key(), r))
.sample(Duration.ofMillis(5000))
.concatMap(offset -> offset.commit()))
.subscribe();
}
< /code>
При запуске приложения, из журналов я заметил, что планировщик создал 5 потоков по одному для каждого раздела. Каждый поток отвечает за потребление события из этого раздела.
Проблема я здесь сталкиваюсь здесь, что все 5 потоков не работают одновременно и, следовательно, неспособны потреблять из всех частиц параллельно, даже если каждый из разделов имеет 1000 рекордов, которые будут обработаны. /> может кто -нибудь, пожалуйста, помогите мне с тем, что мне здесь не хватает? Или как мы можем прочитать со всех разделов параллельно, а также позаботиться о заказе в разделе.

Подробнее здесь: https://stackoverflow.com/questions/748 ... isnt-worki
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Кафка, производящая и потребляющая в Python, не работает. Кафка в Docker и создание Totopics работает
    Anonymous » » в форуме Python
    0 Ответы
    5 Просмотры
    Последнее сообщение Anonymous
  • Кафка получите количество разделов для темы
    Anonymous » » в форуме JAVA
    0 Ответы
    2 Просмотры
    Последнее сообщение Anonymous
  • Запись видео и одновременная обработка кадров в Android с помощью CameraX
    Гость » » в форуме Android
    0 Ответы
    46 Просмотры
    Последнее сообщение Гость
  • Одновременная обработка в настольной программе C#
    Anonymous » » в форуме C#
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Неожиданные типы в Pycharm меньше, чем функция с классом данных и общим упорядочением
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»