Неравномерное распределение Kafka RoundRobinPartitions в Kafka 2.4+JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Неравномерное распределение Kafka RoundRobinPartitions в Kafka 2.4+

Сообщение Anonymous »

Я столкнулся с проблемой, когда обнаружил, что Kafka отправляет сообщения только в четные разделы.
После изучения я обнаружил, что Kafka сообщает об открытой ошибке, при которой метод раздела вызывается дважды, что приводит к пропуску нечетных разделов.
https://issues.apache.org/jira/browse/KAFKA-9965
В октябре был поднят PR 2021 г. и после этого не удалось найти никаких дальнейших обновлений.
Кто-нибудь знает, исправила ли Kafka эту проблему и выпустила ли какой-нибудь патч? Я использую Kafka версии 2.4.1.
Смотрите комментарий к заявке.
RoundRobinPartitioner утверждает, что обеспечит равное распределение записей по разделам. Однако с учетом усовершенствований, внесенных в KIP-480, это может быть не так. В некоторых случаях при запуске нового пакета разделитель может быть вызван второй раз для одной и той же записи:
https://github.com/apache/kafka/blob/2. ... .java#L909
https://github.com/apache/kafka/blob/2. ... .java#L934
Каждый раз, когда вызывается разделитель, он увеличивает счетчик в RoundRobinPartitioner, поэтому это может привести к неравномерному распределению.
Фрагмент кода
package com.example.javakafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class ProducerDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);

public static void main(String[] args) {

String bootstrapServers = "127.0.0.1:9092";

// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty("partitioner.class", "com.example.javakafka.RoundRobinPartitionerNew");

// create the producer
KafkaProducer producer = new KafkaProducer(properties);

// create a producer record
ProducerRecord producerRecord ;

int counter = 0;

while(counter < 40) {

// create a producer record
producerRecord =
new ProducerRecord("demo-java-topic-10", "hello world");

// send data - asynchronous
producer.send(producerRecord);
counter++;
}

// flush data - synchronous
producer.flush();
// flush and close producer
producer.close();
}
}

Класс RoundRobinPartitionerNew — тот же код, что и в Kafka API org.apache.kafka.clients.producer.RoundRobinPartitioner
package com.example.javakafka;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

public class RoundRobinPartitionerNew implements Partitioner {
private final ConcurrentMap topicCounterMap = new ConcurrentHashMap();

public RoundRobinPartitionerNew() {
}

public void configure(Map configs) {
}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List
partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = this.nextValue(topic);
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}

public void close() {
}
}

Если я использую версию 1.0.0 и разделитель «org.apache.kafka.clients.producer.RoundRobinPartitionerNew»

org.apache.kafka
kafka-clients
1.0.0


он ​​будет правильно распределять сообщения следующим образом:
CreateTime:1673933856783 Partition:9 null hello world
CreateTime:1673933856784 Partition:9 null hello world
CreateTime:1673933856784 Partition:9 null hello world
CreateTime:1673933856784 Partition:9 null hello world
CreateTime:1673933856783 Partition:8 null hello world
CreateTime:1673933856784 Partition:8 null hello world
CreateTime:1673933856784 Partition:8 null hello world
CreateTime:1673933856784 Partition:8 null hello world
CreateTime:1673933856783 Partition:5 null hello world
CreateTime:1673933856784 Partition:5 null hello world
CreateTime:1673933856784 Partition:5 null hello world
CreateTime:1673933856784 Partition:5 null hello world
CreateTime:1673933856783 Partition:4 null hello world
CreateTime:1673933856784 Partition:4 null hello world
CreateTime:1673933856784 Partition:4 null hello world
CreateTime:1673933856784 Partition:4 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:7 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856784 Partition:6 null hello world
CreateTime:1673933856783 Partition:1 null hello world
CreateTime:1673933856784 Partition:1 null hello world
CreateTime:1673933856784 Partition:1 null hello world
CreateTime:1673933856784 Partition:1 null hello world
CreateTime:1673933856781 Partition:0 null hello world
CreateTime:1673933856784 Partition:0 null hello world
CreateTime:1673933856784 Partition:0 null hello world
CreateTime:1673933856784 Partition:0 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856784 Partition:3 null hello world
CreateTime:1673933856783 Partition:2 null hello world
CreateTime:1673933856784 Partition:2 null hello world
CreateTime:1673933856784 Partition:2 null hello world
CreateTime:1673933856784 Partition:2 null hello world

Если я использую версию 2.4.1 и более поздние и использую тот же разделитель

org.apache.kafka
kafka-clients
2.4.0


Я попадаю ниже, где назначены только четные разделы.
CreateTime:1673933438685 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:8 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:4 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:6 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:0 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world
CreateTime:1673933438688 Partition:2 null hello world


Подробнее здесь: https://stackoverflow.com/questions/751 ... -kafka-2-4
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»