Проблемы производительности KafkaConsumer API и предложения по оптимизации (клиенты Kafka 3.6.1)JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Проблемы производительности KafkaConsumer API и предложения по оптимизации (клиенты Kafka 3.6.1)

Сообщение Anonymous »

Сценарий: пользователь вводит смещение в качестве входных данных, и на основе смещения нам нужно предоставить 1000 сообщений из темы kafka и следующего смещения. Тема kafka содержит только один раздел.
Мы пытаемся перейти со старой Kafka на новую Kafka. В старой Kafka мы использовали такой код:
старый код (клиенты Kafka 0.8.1):

Код: Выделить всё

FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic,
a_partition, readOffset, fetchSize).build();
FetchResponse fetchResponse = consumer.fetch(req);
ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, a_partition);
Этот код очень быстрый, и мы пытаемся достичь того же, используя KafkaConsumer API, но получаем медлительность.
Используется новый код kafkaconsumer (kafkaclients 3.6 .1). )

Код: Выделить всё

TopicPartition topicPartition = new TopicPartition("Test", 0);
consumer.seekToBeginning(Collections.singletonList(topicPartition));
long kafkaEarliestOffset = consumer.position(topicPartition);
try (KafkaConsumer < String, String > consumer = KafkaConsumerFactory.createConsumer(clientName, fetchSize)) {
consumer.partitionsFor(topicName);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, readOffset);
do {
ConsumerRecords < String, String > records =
consumer.poll(Duration.ofMillis(1500));
} while (!end)

public static KafkaConsumer createConsumer(String clientName,int fetchSize) {
Properties props = new Properties();
String kafkaBrokerStr = Config.getConsumerPropValue("kafkabrokerslist");
String groupId = Config.getConsumerPropValue("group.id");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,clientName);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
return new KafkaConsumer(props);
}
Объяснение кода: назначение, поиск, поиск самого раннего смещения и выборка записей занимают время.

Код: Выделить всё

Poll Records Count: 500 diff: 1284
Poll Records Count: 500 diff: 4
Для первых 500 записей потребовалось 1284 мс, а для следующих 500 записей — 4 мс.
почему такая большая разница? Я хотел бы улучшить производительность первого опроса?
Как я могу 1) получить первые 500 записей за меньшее время 2) эффективно выполнить Consumer.assign,consumer.seek и получить самое раннее смещение в Кафке?

Подробнее здесь: https://stackoverflow.com/questions/792 ... s-kafka-cl
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»