Я хочу запустить интеграционные тесты, которые проверят мой прослушиватель Kafka и сериализацию avro. для этого требуется реестр Kafka и Schema (транзитивно также Zookeeper).
При тестировании мне сейчас нужен docker-compose.yml, но я хочу уменьшить количество ошибок пользователя, создавая необходимые контейнеры через тест-контейнеры. Экземпляры Kafka и Zookeeper запускаются аккуратно и, похоже, работают нормально - мое приложение может создавать необходимые темы, а прослушиватель также подписан, я даже могу отправлять сообщения через производителя консоли Kafka.
Что не работает, так это SchemaRegistry. Контейнер запускается, видимо подключается к ЗК но не может установить соединение с брокером. Он повторяет попытку подключения в течение некоторого времени, пока не истечет время ожидания, после чего контейнер будет остановлен. Поэтому я не могу зарегистрироваться и прочитать свои схемы avro для (де-)сериализации в моем тесте, который из-за этого терпит неудачу.
Я не могу найти причину, по которой SR, очевидно, может подключиться к ZK, но не может найти моего брокера.
Кто-то тоже сталкивался с этой проблемой? Вам удалось это запустить? Если да, то как?
Мне нужно, чтобы Kafka и тестовые контейнеры реестра схемы были полностью доступны для моих тестов, поэтому исключение любого из них не является вариантом.
Я также мог бы продолжать использовать docker-compose.yml, но мне бы очень хотелось настроить свою тестовую среду полностью программно.
Контейнер реестра схемы регистрирует следующее:
2023-02-08 16:56:09 [2023-02-08 15:56:09,556] INFO Session establishment complete on server zookeeper/192.168.144.2:2181, session id = 0x1000085b81e0003, negotiated timeout = 40000 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO Session: 0x1000085b81e0003 closed (org.apache.zookeeper.ZooKeeper)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO EventThread shut down for session: 0x1000085b81e0003 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,787] INFO AdminClientConfig values:
/* Omitted for brevity */
(org.apache.kafka.clients.admin.AdminClientConfig)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka version: 7.3.1-ccs (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka commitId: 8628b0341c3c4676 (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka startTimeMs: 1675871770281 (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,308] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:10 [2023-02-08 15:56:10,313] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
/* These lines repeat a few times until the container times out and exits. */
2023-02-08 16:56:50 [2023-02-08 15:56:50,144] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:50 [2023-02-08 15:56:50,144] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:50 [2023-02-08 15:56:50,298] ERROR Error while getting broker list. (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:50 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-02-08 16:56:50 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
2023-02-08 16:56:50 at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
2023-02-08 16:56:50 at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
2023-02-08 16:56:50 at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:147)
2023-02-08 16:56:50 at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:149)
2023-02-08 16:56:50 Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-02-08 16:56:51 [2023-02-08 15:56:51,103] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:51 [2023-02-08 15:56:51,103] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:51 [2023-02-08 15:56:51,300] INFO Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ... (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:51 [2023-02-08 15:56:51,300] ERROR Expected 1 brokers but found only 0. Brokers found []. (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:51 Using log4j config /etc/schema-registry/log4j.properties
Мой базовый тестовый класс. ИТ-специалисты, которым необходимо расширить этот класс Kafka
@Testcontainers
@SpringBootTest
@Slf4j
public class AbstractIT {
private static final Network network = Network.newNetwork();
protected static GenericContainer ZOOKEEPER = new GenericContainer(
DockerImageName.parse("confluentinc/cp-zookeeper:7.2.0"))
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv(Map.of(
"ZOOKEEPER_CLIENT_PORT", "2181",
"ZOOKEEPER_TICK_TIME", "2000"));
protected static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka"))
.withExternalZookeeper("zookeeper:2181")
.dependsOn(ZOOKEEPER)
.withNetwork(network)
.withNetworkAliases("broker");
protected static final GenericContainer SCHEMAREGSISTRY = new GenericContainer(
DockerImageName.parse("confluentinc/cp-schema-registry"))
.dependsOn(ZOOKEEPER, KAFKA)
.withEnv(Map.of(
"SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL", "zookeeper:2181",
"SCHEMA_REGISTRY_HOST_NAME", "schemaregistry",
"SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085",
"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092"))
.withNetwork(network)
.withNetworkAliases("schemaregistry");
@DynamicPropertySource
static void registerPgProperties(DynamicPropertyRegistry registry) {
registry.add("bootstrap.servers", KAFKA::getBootstrapServers);
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest");
registry.add("spring.data.mongodb.uri", MONGODB::getConnectionString);
registry.add("spring.data.mongodb.database", () ->"test");
}
//container startup, shutdown as well as topic creation omitted for brevity
}
Мой docker-compose.yml, который я хочу реплицировать с помощью тестовых контейнеров
version: "3.5"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.2.0
hostname: broker
container_name: broker
restart: always
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_SCHEMA_REGISTRY_URL: "schemaregistry:8085"
schemaregistry:
container_name: schemaregistry
hostname: schemaregistry
image: confluentinc/cp-schema-registry:5.1.2
restart: always
depends_on:
- zookeeper
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
ports:
- "8085:8085"
volumes:
- "./src/main/avro/:/etc/schema"
Подробнее здесь: https://stackoverflow.com/questions/753 ... -container
Testcontainers SchemaRegistry не может подключиться к контейнеру Kafka ⇐ JAVA
Программисты JAVA общаются здесь
1769605169
Anonymous
Я хочу запустить интеграционные тесты, которые проверят мой прослушиватель Kafka и сериализацию avro. для этого требуется реестр Kafka и Schema (транзитивно также Zookeeper).
При тестировании мне сейчас нужен docker-compose.yml, но я хочу уменьшить количество ошибок пользователя, создавая необходимые контейнеры через тест-контейнеры. Экземпляры Kafka и Zookeeper запускаются аккуратно и, похоже, работают нормально - мое приложение может создавать необходимые темы, а прослушиватель также подписан, я даже могу отправлять сообщения через производителя консоли Kafka.
Что не работает, так это SchemaRegistry. Контейнер запускается, видимо подключается к ЗК но не может установить соединение с брокером. Он повторяет попытку подключения в течение некоторого времени, пока не истечет время ожидания, после чего контейнер будет остановлен. Поэтому я не могу зарегистрироваться и прочитать свои схемы avro для (де-)сериализации в моем тесте, который из-за этого терпит неудачу.
Я не могу найти причину, по которой SR, очевидно, может подключиться к ZK, но не может найти моего брокера.
Кто-то тоже сталкивался с этой проблемой? Вам удалось это запустить? Если да, то как?
Мне нужно, чтобы Kafka и тестовые контейнеры реестра схемы были полностью доступны для моих тестов, поэтому исключение любого из них не является вариантом.
Я также мог бы продолжать использовать docker-compose.yml, но мне бы очень хотелось настроить свою тестовую среду полностью программно.
Контейнер реестра схемы регистрирует следующее:
2023-02-08 16:56:09 [2023-02-08 15:56:09,556] INFO Session establishment complete on server zookeeper/192.168.144.2:2181, session id = 0x1000085b81e0003, negotiated timeout = 40000 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO Session: 0x1000085b81e0003 closed (org.apache.zookeeper.ZooKeeper)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO EventThread shut down for session: 0x1000085b81e0003 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,787] INFO AdminClientConfig values:
/* Omitted for brevity */
(org.apache.kafka.clients.admin.AdminClientConfig)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka version: 7.3.1-ccs (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka commitId: 8628b0341c3c4676 (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka startTimeMs: 1675871770281 (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,308] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:10 [2023-02-08 15:56:10,313] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
/* These lines repeat a few times until the container times out and exits. */
2023-02-08 16:56:50 [2023-02-08 15:56:50,144] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:50 [2023-02-08 15:56:50,144] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:50 [2023-02-08 15:56:50,298] ERROR Error while getting broker list. (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:50 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-02-08 16:56:50 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
2023-02-08 16:56:50 at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
2023-02-08 16:56:50 at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
2023-02-08 16:56:50 at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:147)
2023-02-08 16:56:50 at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:149)
2023-02-08 16:56:50 Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-02-08 16:56:51 [2023-02-08 15:56:51,103] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:51 [2023-02-08 15:56:51,103] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:51 [2023-02-08 15:56:51,300] INFO Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ... (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:51 [2023-02-08 15:56:51,300] ERROR Expected 1 brokers but found only 0. Brokers found []. (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:51 Using log4j config /etc/schema-registry/log4j.properties
Мой базовый тестовый класс. ИТ-специалисты, которым необходимо расширить этот класс Kafka
@Testcontainers
@SpringBootTest
@Slf4j
public class AbstractIT {
private static final Network network = Network.newNetwork();
protected static GenericContainer ZOOKEEPER = new GenericContainer(
DockerImageName.parse("confluentinc/cp-zookeeper:7.2.0"))
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv(Map.of(
"ZOOKEEPER_CLIENT_PORT", "2181",
"ZOOKEEPER_TICK_TIME", "2000"));
protected static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka"))
.withExternalZookeeper("zookeeper:2181")
.dependsOn(ZOOKEEPER)
.withNetwork(network)
.withNetworkAliases("broker");
protected static final GenericContainer SCHEMAREGSISTRY = new GenericContainer(
DockerImageName.parse("confluentinc/cp-schema-registry"))
.dependsOn(ZOOKEEPER, KAFKA)
.withEnv(Map.of(
"SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL", "zookeeper:2181",
"SCHEMA_REGISTRY_HOST_NAME", "schemaregistry",
"SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085",
"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092"))
.withNetwork(network)
.withNetworkAliases("schemaregistry");
@DynamicPropertySource
static void registerPgProperties(DynamicPropertyRegistry registry) {
registry.add("bootstrap.servers", KAFKA::getBootstrapServers);
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest");
registry.add("spring.data.mongodb.uri", MONGODB::getConnectionString);
registry.add("spring.data.mongodb.database", () ->"test");
}
//container startup, shutdown as well as topic creation omitted for brevity
}
Мой docker-compose.yml, который я хочу реплицировать с помощью тестовых контейнеров
version: "3.5"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.2.0
hostname: broker
container_name: broker
restart: always
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_SCHEMA_REGISTRY_URL: "schemaregistry:8085"
schemaregistry:
container_name: schemaregistry
hostname: schemaregistry
image: confluentinc/cp-schema-registry:5.1.2
restart: always
depends_on:
- zookeeper
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
ports:
- "8085:8085"
volumes:
- "./src/main/avro/:/etc/schema"
Подробнее здесь: [url]https://stackoverflow.com/questions/75388650/testcontainers-schemaregistry-cant-connect-to-kafka-container[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия