Я использую kafka @kafkalistener с @transactional ("kafkatransactionmanager"). Мой поток будет чем -то чтением/процессом/совершением использования потребителя транзакций.
public void listen(List records) {
// do some database activities
final Object result = kafkaTemplate.send(producerRecord).get();
//I am sending data and waiting for response to capture offset and store in DB
long offset = ((SendResult) result).getRecordMetadata().offset();
int partition = ((SendResult) result).getRecordMetadata().partition();
log.info(offset ,partition );
}
< /code>
Когда что -то произойдет до или у отправителя, затем откатается транзакция, и он читается из того же смещения. Комплект смещений
В этом случае потребитель снова будет прочитать одно и то же смещение (я думаю, так как транзакция успешно завершена)? /> Я использую acks = -1 и idempotence = true. < /p>
Подробнее здесь: https://stackoverflow.com/questions/796 ... pired-befo
Org.apache.kafka.common.errors.timeoutexception: Тайм -аут 60000 мс истек, прежде чем успешно совершить смещения с Kafka ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение