Как реализовать многопоточный продюсер Kafka на JavaJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как реализовать многопоточный продюсер Kafka на Java

Сообщение Anonymous »

У меня есть производитель Kafka, который отправляет сообщения, используя метод Sendmessage ниже. Метод принимает имя темы, имя группы, ключ и полезную нагрузку сообщения, затем обрабатывает и публикует несколько сообщений из массива JSON. < /P>
public void sendMessage(String topicName, String groupName, String key, String msg) throws Exception {
if (msg != null) {
JSONObject jsonReq = new JSONObject(msg);

if (key == null || topicName == null || "".equals(key) || "".equals(topicName)) {
throw new CustomException("Invalid key/topicname.");
}
if (jsonReq.has(groupName)) {
JSONArray messages = jsonReq.getJSONArray(groupName);
if (messages.length() == 0) {
log.info("No messages found in groupName: {}. Skipping processing.", groupName);
return;
}

List messageIds = new ArrayList();
AtomicBoolean errorFlag = new AtomicBoolean(false);
long startTime = System.currentTimeMillis();

for (int i = 0; i < messages.length(); i++) {
JSONObject obj = messages.getJSONObject(i);
if (obj.has(key) && StringUtils.isNotBlank(obj.getString(key))) {
ProducerRecord producerRecord =
new ProducerRecord(topicName, obj.getString(key), obj.toString());

producer.send(producerRecord, new CRKafkaCallBackHandler(producerRecord, errorFlag));
messageIds.add(topicName + "\t" + obj.getString(key));
} else {
throw new CustomException("Mandatory property '" + key + "' is missing in message: " + obj.toString());
}
}

long endTime = System.currentTimeMillis();
log.info("Total Messages: {} For Topic: {} Time Taken: {} ms", messages.length(), topicName, (endTime - startTime));

if (errorFlag.get()) {
throw new CustomException("Failed to publish one or more messages to Kafka");
}
} else {
throw new CustomException("Invalid groupName found in request.");
}
} else {
throw new CustomException("Received a NULL or invalid message.");
}
log.info("Message processing completed successfully For Topic: {}", topicName);
}
< /code>
Прямо сейчас этот метод работает в одном из стертовых, и мы получаем timeoutexception org.apache.kafka.common.errors.timeoutexception: истекает 18 записей 120001 MS Прошло с момента создания партии, я подозреваю, что это может быть узким местом под высокой нагрузкой. Я хочу повысить пропускную способность, внедрив многопоточного производителя Kafka. Должен ли я создать пул потоков с фиксированным размером и отправить задачи Sendmessage, или есть более удобный подход Kafka?
Как я могу обеспечить безопасность потока? Поскольку метод Кафки Send () асинхронно, мне нужно беспокоиться о проблемах параллелизма с экземпляром производителя?

Подробнее здесь: https://stackoverflow.com/questions/794 ... er-in-java
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»