KafkaListener не работает через некоторое время (Micronaut)JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 KafkaListener не работает через некоторое время (Micronaut)

Сообщение Anonymous »

Я использую Kafka для взаимодействия нескольких микросервисов. По сути, у меня есть микросервис, который регистрирует объекты, а затем публикует их в виде сообщения Kafka, чтобы слушатели могли их использовать и синхронизировать. Проблема, с которой я столкнулся, заключается в том, что по какой-то причине через некоторое время некоторые KafkaListeners перестают работать с некоторыми микросервисами. Я выполнил некоторые проверки, и проблема, похоже, не в публикации, поскольку у меня есть экземпляр KafkaMagic, с помощью которого я могу проверить правильность публикации сообщения и его содержимого. Самое странное во всем этом то, что я понял, что, просто изменив имя класса слушателя, он автоматически снова начинает работать и даже начинает потреблять сообщения, которые раньше игнорировал. Например: если у прослушивателя есть имя «PatientListener» и оно не работает, простое изменение его на «PatientListenerX» решит проблему. Однако это временное решение, поскольку через некоторое время они снова перестают работать. Я не знаю, является ли это проблемой микросервисов или проблемой конфигурации Kafka как таковой.
Я использую Kakfa, используя Docker-контейнер с другими зависимостями.
Ниже я покажу вам содержимое docker-compose и конфигурации, которые у меня есть в потребительском микросервисе.
Это мой docker-compose для Kafka:

Код: Выделить всё

version: '3.5'

services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- 2181:2181 # 
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 9092:9092 # 
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

schema-registry:
image: confluentinc/cp-schema-registry:4.1.1
hostname: schema-registry
ports:
- "38081:38081"
depends_on:
- kafka
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:38081
SCHEMA_REGISTRY_DEBUG: "true"

kafka-rest:
image: confluentinc/cp-kafka-rest:4.1.1
hostname: kafka-rest
ports:
- "38082:38082"
depends_on:
- schema-registry
environment:
KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_REST_SCHEMA_REGISTRY_URL: schema-registry:38081
KAFKA_REST_HOST_NAME: kafka-rest
KAFKA_REST_LISTENERS: http://kafka-rest:38082
Это моя конфигурация для потребителя в application.yml:

Код: Выделить всё

kafka:
bootstrap:
servers: ${KAFKA_HOST:`localhost:9092`}
enabled: true
producers:
default:
retries: ${KAFKA_RETRIES:5}
retry.backoff.ms: ${KAFKA_RETRY_BACKOFF:3000}
А это мой KafkaListener:

Код: Выделить всё

@KafkaListener
public class SpecificTaskMedicalRecordsListener {
private final ManualTenantResolver manualTenantResolver;
private final SpecificTaskService specificTaskService;
private final ObjectMapper objectMapper;
public SpecificTaskMedicalRecordsListener(
ManualTenantResolver manualTenantResolver, SpecificTaskService specificTaskService) {
this.manualTenantResolver = manualTenantResolver;
this.specificTaskService = specificTaskService;
this.objectMapper = new ObjectMapper();
}

@Topic("core-specific-task-save")
public void save(KafkaRequest kafkaRequest) {
SpecificTaskRequest request = objectMapper.convertValue(kafkaRequest.getResponse(), SpecificTaskRequest.class);
manualTenantResolver.setManualTenant(LogUtil.getTenancyFromToken(kafkaRequest.getToken()));
specificTaskService.save(request);
}
}
Используется следующая зависимость Micronaut:

Код: Выделить всё

implementation('io.micronaut.kafka:micronaut-kafka:4.5.0')

Я очень ценю вашу помощь.
Проведенные мной тесты касались изменения имен, о которых я упоминал. Но это временное решение. Более того, я не обнаружил временного шаблона, позволяющего узнать, когда мне следует изменить имя, чтобы избежать ошибки. Я подумал, что, возможно, это проблема с кешем, но я не знаю, как ее проверить.
Что мне нужно, так это чтобы KafkaListeners могли без проблем обрабатывать сообщения.

Подробнее здесь: https://stackoverflow.com/questions/791 ... -micronaut
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»