Я использую Kakfa, используя Docker-контейнер с другими зависимостями.
Ниже я покажу вам содержимое docker-compose и конфигурации, которые у меня есть в потребительском микросервисе.
Это мой docker-compose для Kafka:
Код: Выделить всё
version: '3.5'
services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- 2181:2181 #
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 9092:9092 #
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:4.1.1
hostname: schema-registry
ports:
- "38081:38081"
depends_on:
- kafka
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://schema-registry:38081
SCHEMA_REGISTRY_DEBUG: "true"
kafka-rest:
image: confluentinc/cp-kafka-rest:4.1.1
hostname: kafka-rest
ports:
- "38082:38082"
depends_on:
- schema-registry
environment:
KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_REST_SCHEMA_REGISTRY_URL: schema-registry:38081
KAFKA_REST_HOST_NAME: kafka-rest
KAFKA_REST_LISTENERS: http://kafka-rest:38082
Код: Выделить всё
kafka:
bootstrap:
servers: ${KAFKA_HOST:`localhost:9092`}
enabled: true
producers:
default:
retries: ${KAFKA_RETRIES:5}
retry.backoff.ms: ${KAFKA_RETRY_BACKOFF:3000}
Код: Выделить всё
@KafkaListener
public class SpecificTaskMedicalRecordsListener {
private final ManualTenantResolver manualTenantResolver;
private final SpecificTaskService specificTaskService;
private final ObjectMapper objectMapper;
public SpecificTaskMedicalRecordsListener(
ManualTenantResolver manualTenantResolver, SpecificTaskService specificTaskService) {
this.manualTenantResolver = manualTenantResolver;
this.specificTaskService = specificTaskService;
this.objectMapper = new ObjectMapper();
}
@Topic("core-specific-task-save")
public void save(KafkaRequest kafkaRequest) {
SpecificTaskRequest request = objectMapper.convertValue(kafkaRequest.getResponse(), SpecificTaskRequest.class);
manualTenantResolver.setManualTenant(LogUtil.getTenancyFromToken(kafkaRequest.getToken()));
specificTaskService.save(request);
}
}
Код: Выделить всё
implementation('io.micronaut.kafka:micronaut-kafka:4.5.0')
Проведенные мной тесты касались изменения имен, о которых я упоминал. Но это временное решение. Более того, я не обнаружил временного шаблона, позволяющего узнать, когда мне следует изменить имя, чтобы избежать ошибки. Я подумал, что, возможно, это проблема с кешем, но я не знаю, как ее проверить.
Что мне нужно, так это чтобы KafkaListeners могли без проблем обрабатывать сообщения.
Подробнее здесь: https://stackoverflow.com/questions/791 ... -micronaut
Мобильная версия