Каков более чистый и эффективный способ реализации проверки работоспособности Kafka в моем приложении весенней загрузки?JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Каков более чистый и эффективный способ реализации проверки работоспособности Kafka в моем приложении весенней загрузки?

Сообщение Anonymous »

У меня есть приложение Spring Boot (2.1.6), которое одновременно обрабатывает и создает сообщения для общего экземпляра Kafka (в масштабе всей организации). Я пытаюсь реализовать проверку работоспособности этого брокера Kafka в своем приложении с помощью пружинного привода, и сталкиваюсь с рядом проблем, связанных с производительностью и ведением журнала. В Spring Boot 2.0 был встроен индикатор работоспособности, но его удалили из-за некоторых очевидных проблем.

Вот класс проверки работоспособности, который я реализовал:

Код: Выделить всё

@Component
public class KafkaHealthCheck implements HealthIndicator {

private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);

private KafkaAdmin kafkaAdmin;

private Map kafkaConfig;

@Value("${application.topic}")
private String topicName;

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

public KafkaHealthCheck(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;

}

@PostConstruct
public void setUpAdminClient() {
kafkaConfig = new HashMap();
kafkaConfig.putAll(kafkaAdmin.getConfig());
kafkaConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
}

@Override
public Health health() {
Long start = System.currentTimeMillis();
try (AdminClient adminClient = AdminClient.create(kafkaConfig)) {

DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(2000);
adminClient.describeCluster(describeClusterOptions);

adminClient.describeConsumerGroups(List.of("topic")).all()
.get(2, TimeUnit.SECONDS);

Map topicDescriptionMap = adminClient
.describeTopics(List.of(topicName)).all().get(2, TimeUnit.SECONDS);

List partitions = topicDescriptionMap.get(topicName)
.partitions();

if (partitions == null || partitions.isEmpty()) {
logger.warn(String
.format("Kafka healthcheck failed - No partition found for topic: %s", topicName));
return Health.down()
.withDetail("Kafka healthcheck failed", "No partition found for topic: " + topicName)
.build();
} else {
if (partitions.stream().anyMatch(p -> p.leader() == null)) {
logger.warn(
String.format("Kafka healthcheck failed - No partition leader found for topic: %s",
topicName));
return Health.down().withDetail("Kafka healthcheck failed",
"No partition leader found for topic: " + topicName).build();
}
}
} catch (Exception e) {
logger.warn("Kafka healthcheck failed", e);
return Health.down()
.withDetail("Kafka healthcheck failed", "Exception occurred during healthcheck").build();
}
System.out.println(System.currentTimeMillis() - start);
return Health.up().build();
}
}
Теперь у меня есть вопросы или проблемы, с которыми я столкнулся при этой реализации:

1 – KafkaAdmin внедряется в этот класс со всей имеющейся у меня конфигурацией (я использую SSL), кроме «bootstrap.servers». Я понял, что org.springframework.boot.autoconfigure.kafka.KafkaProperties имеет localhost:9092 по умолчанию, который каким-то образом не переопределяется конфигурацией приложения, хотя он отлично работает для потребителя и производителя. Я понятия не имею, почему это так, и поэтому мне придется настроить это здесь вручную.

2 – я добавил тайм-ауты в DescribeClusterOptions и DescribeConsumerGroups но эти тайм-ауты, похоже, полностью игнорируются. Если я отключу брокер вручную, проверка работоспособности займет около пары минут, чтобы сообщить об ошибке.

3. Из-за ошибки bootstrap.servers, когда я фактически развернул приложение, оно почти убило мой сервер журналов миллионами строк журнала, сгенерированных org.apache.kafka.clients.NetworkClient, в которых говорилось, что соединение с узлом -1 не может быть установлено. Брокер может быть недоступен.. Как я могу предотвратить повторение этого? Даже в тех случаях, когда брокер выходит из строя во время операций.

4. Даже успешная проверка работоспособности создает много строк журнала при создании AdminClient. Он выводит всю прочитанную конфигурацию и кучу других операторов. Есть ли шанс свести это к минимуму?

5 - В целом, это очень медленно. Я пытался подсчитать время, необходимое для запуска только этой проверки работоспособности, и в среднем оно составляет около 1,5 секунд. Есть ли шанс его оптимизировать?

Подробнее здесь: https://stackoverflow.com/questions/592 ... r-kafka-in
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»