Я столкнулся с проблемой, когда обнаружил, что Kafka отправляет сообщения только в четные разделы.
После изучения я обнаружил, что Kafka сообщает об открытой ошибке, при которой метод раздела вызывается дважды, что приводит к пропуску нечетных разделов.
https://issues.apache.org/jira/browse/KAFKA-9965
В октябре был поднят PR 2021 г. и после этого не удалось найти никаких дальнейших обновлений.
Кто-нибудь знает, исправила ли Kafka эту проблему и выпустила ли какой-нибудь патч? Я использую Kafka версии 2.4.1.
Смотрите комментарий к заявке.
RoundRobinPartitioner утверждает, что обеспечит равное распределение записей по разделам. Однако с учетом усовершенствований, внесенных в KIP-480, это может быть не так. В некоторых случаях при запуске нового пакета разделитель может быть вызван второй раз для одной и той же записи:
https://github.com/apache/kafka/blob/2. ... .java#L909
https://github.com/apache/kafka/blob/2. ... .java#L934
Каждый раз, когда вызывается разделитель, он увеличивает счетчик в RoundRobinPartitioner, поэтому это может привести к неравномерному распределению.
Фрагмент кода
package com.example.javakafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "127.0.0.1:9092";
// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty("partitioner.class", "com.example.javakafka.RoundRobinPartitionerNew");
// create the producer
KafkaProducer producer = new KafkaProducer(properties);
// create a producer record
ProducerRecord producerRecord ;
int counter = 0;
while(counter < 40) {
// create a producer record
producerRecord =
new ProducerRecord("demo-java-topic-10", "hello world");
// send data - asynchronous
producer.send(producerRecord);
counter++;
}
// flush data - synchronous
producer.flush();
// flush and close producer
producer.close();
}
}
Класс RoundRobinPartitionerNew — тот же код, что и в Kafka API org.apache.kafka.clients.producer.RoundRobinPartitioner
package com.example.javakafka;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
public class RoundRobinPartitionerNew implements Partitioner {
private final ConcurrentMap topicCounterMap = new ConcurrentHashMap();
public RoundRobinPartitionerNew() {
}
public void configure(Map configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List
partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = this.nextValue(topic);
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
public void close() {
}
}
Если я использую версию 1.0.0 и разделитель «org.apache.kafka.clients.producer.RoundRobinPartitionerNew»
org.apache.kafka
kafka-clients
1.0.0
он будет правильно распределять сообщения следующим образом:
CreateTime:1673933856783 Partition:9 null hello world
CreateTime:1673933856784 Partition:9 null hello world
CreateTime:1673933856784 Partition:9 null hello world
CreateTime:1673933856784 Partition:9 null hello world
CreateTime:1673933856783 Partition:8 null hello world
CreateTime:1673933856784 Partition:8 null hello world
CreateTime:1673933856784 Partition:8 null hello world
CreateTime:1673933856784 Partition:8 null hello world
CreateTime:1673933856783 Partition:5 null hello world
CreateTime:1673933856784 Partition:5 null hello world
CreateTime:1673933856784 Partition:5 null hello world
CreateTime:1673933856784 Partition:5 null hello world
CreateTime:1673933856783 Partition:4 null hello world
CreateTime:1673933856784 Partition:4 null hello world
CreateTime:1673933856784 Partition:4 null hello world
CreateTime:1673933856784 Partition:4 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856783 Partition:1 null hello world
CreateTime:1673933856784 Partition:1 null hello world
CreateTime:1673933856784 Partition:1 null hello world
CreateTime:1673933856784 Partition:1 null hello world
CreateTime:1673933856781 Partition:0 null hello world
CreateTime:1673933856784 Partition:0 null hello world
CreateTime:1673933856784 Partition:0 null hello world
CreateTime:1673933856784 Partition:0 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856783 Partition:2 null hello world
CreateTime:1673933856784 Partition:2 null hello world
CreateTime:1673933856784 Partition:2 null hello world
CreateTime:1673933856784 Partition:2 null hello world
Если я использую версию 2.4.1 и более поздние и использую тот же разделитель
org.apache.kafka
kafka-clients
2.4.0
Я попадаю ниже, где назначены только четные разделы.
CreateTime:1673933438685 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
Подробнее здесь: https://stackoverflow.com/questions/751 ... -kafka-2-4
Неравномерное распределение Kafka RoundRobinPartitions в Kafka 2.4+ ⇐ JAVA
Программисты JAVA общаются здесь
-
Anonymous
1767517505
Anonymous
Я столкнулся с проблемой, когда обнаружил, что Kafka отправляет сообщения только в четные разделы.
После изучения я обнаружил, что Kafka сообщает об открытой ошибке, при которой метод раздела вызывается дважды, что приводит к пропуску нечетных разделов.
https://issues.apache.org/jira/browse/KAFKA-9965
В октябре был поднят PR 2021 г. и после этого не удалось найти никаких дальнейших обновлений.
[b]Кто-нибудь знает, исправила ли Kafka эту проблему и выпустила ли какой-нибудь патч? Я использую Kafka версии 2.4.1.[/b]
Смотрите комментарий к заявке.
RoundRobinPartitioner утверждает, что обеспечит равное распределение записей по разделам. Однако с учетом усовершенствований, внесенных в KIP-480, это может быть не так. В некоторых случаях при запуске нового пакета разделитель может быть вызван второй раз для одной и той же записи:
https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L909
https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L934
Каждый раз, когда вызывается разделитель, он увеличивает счетчик в RoundRobinPartitioner, поэтому это может привести к неравномерному распределению.
[b]Фрагмент кода[/b]
package com.example.javakafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) {
String bootstrapServers = "127.0.0.1:9092";
// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty("partitioner.class", "com.example.javakafka.RoundRobinPartitionerNew");
// create the producer
KafkaProducer producer = new KafkaProducer(properties);
// create a producer record
ProducerRecord producerRecord ;
int counter = 0;
while(counter < 40) {
// create a producer record
producerRecord =
new ProducerRecord("demo-java-topic-10", "hello world");
// send data - asynchronous
producer.send(producerRecord);
counter++;
}
// flush data - synchronous
producer.flush();
// flush and close producer
producer.close();
}
}
Класс RoundRobinPartitionerNew — тот же код, что и в Kafka API org.apache.kafka.clients.producer.RoundRobinPartitioner
package com.example.javakafka;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
public class RoundRobinPartitionerNew implements Partitioner {
private final ConcurrentMap topicCounterMap = new ConcurrentHashMap();
public RoundRobinPartitionerNew() {
}
public void configure(Map configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List
partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = this.nextValue(topic);
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
public void close() {
}
}
Если я использую версию 1.0.0 и разделитель «org.apache.kafka.clients.producer.RoundRobinPartitionerNew»
org.apache.kafka
kafka-clients
1.0.0
он будет правильно распределять сообщения следующим образом:
CreateTime:1673933856783 Partition:9 null hello world
CreateTime:1673933856784 Partition:9 null hello world
CreateTime:1673933856784 Partition:9 null hello world
CreateTime:1673933856784 Partition:9 null hello world
CreateTime:1673933856783 Partition:8 null hello world
CreateTime:1673933856784 Partition:8 null hello world
CreateTime:1673933856784 Partition:8 null hello world
CreateTime:1673933856784 Partition:8 null hello world
CreateTime:1673933856783 Partition:5 null hello world
CreateTime:1673933856784 Partition:5 null hello world
CreateTime:1673933856784 Partition:5 null hello world
CreateTime:1673933856784 Partition:5 null hello world
CreateTime:1673933856783 Partition:4 null hello world
CreateTime:1673933856784 Partition:4 null hello world
CreateTime:1673933856784 Partition:4 null hello world
CreateTime:1673933856784 Partition:4 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856783 Partition:1 null hello world
CreateTime:1673933856784 Partition:1 null hello world
CreateTime:1673933856784 Partition:1 null hello world
CreateTime:1673933856784 Partition:1 null hello world
CreateTime:1673933856781 Partition:0 null hello world
CreateTime:1673933856784 Partition:0 null hello world
CreateTime:1673933856784 Partition:0 null hello world
CreateTime:1673933856784 Partition:0 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856783 Partition:2 null hello world
CreateTime:1673933856784 Partition:2 null hello world
CreateTime:1673933856784 Partition:2 null hello world
CreateTime:1673933856784 Partition:2 null hello world
Если я использую версию 2.4.1 и более поздние и использую тот же разделитель
org.apache.kafka
kafka-clients
2.4.0
Я попадаю ниже, где назначены только четные разделы.
CreateTime:1673933438685 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
Подробнее здесь: [url]https://stackoverflow.com/questions/75137779/kafka-roundrobinpartitioner-uneven-distribution-in-kafka-2-4[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия