Мы пытаемся перейти со старой Kafka на новую Kafka. В старой Kafka мы использовали такой код:
старый код (клиенты Kafka 0.8.1):
Код: Выделить всё
FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic,
a_partition, readOffset, fetchSize).build();
FetchResponse fetchResponse = consumer.fetch(req);
ByteBufferMessageSet set = fetchResponse.messageSet(a_topic, a_partition);
Используется новый код kafkaconsumer (kafkaclients 3.6 .1). )
Код: Выделить всё
TopicPartition topicPartition = new TopicPartition("Test", 0);
consumer.seekToBeginning(Collections.singletonList(topicPartition));
long kafkaEarliestOffset = consumer.position(topicPartition);
try (KafkaConsumer < String, String > consumer = KafkaConsumerFactory.createConsumer(clientName, fetchSize)) {
consumer.partitionsFor(topicName);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, readOffset);
do {
ConsumerRecords < String, String > records =
consumer.poll(Duration.ofMillis(1500));
} while (!end)
public static KafkaConsumer createConsumer(String clientName,int fetchSize) {
Properties props = new Properties();
String kafkaBrokerStr = Config.getConsumerPropValue("kafkabrokerslist");
String groupId = Config.getConsumerPropValue("group.id");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,clientName);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
return new KafkaConsumer(props);
}
Код: Выделить всё
Poll Records Count: 500 diff: 1284
Poll Records Count: 500 diff: 4
почему такая большая разница? Я хотел бы улучшить производительность первого опроса?
Как я могу 1) получить первые 500 записей за меньшее время 2) эффективно выполнить Consumer.assign,consumer.seek и получить самое раннее смещение в Кафке?
Подробнее здесь: https://stackoverflow.com/questions/792 ... s-kafka-cl
Мобильная версия