Я пытаюсь написать проверку здоровья на Кафку, которая периодически работает, проверяет связь и сообщаю о любых проблемах с помощью оповещений Прометея. Моя идея о том, как это сделать, заключалась в том, чтобы просто сделать запрос на Кафку в блоке Try Catch и рассматривать любое исключение как потерянное подключение, код ниже. < /P>
@Scheduled(fixedRate = 1000)
private void checkEventHubConnection() {
try {
System.out.println("Checking eventHub connection...");
adminClient.listTopics().names().get();
eventHubStatus = 1;
} catch (ExecutionException ee) {
eventHubStatusDown(ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
eventHubStatusDown(ie);
}
}
< /code>
Моя проблема с модульными тестами. Проверка положительного случая достаточно проста и работает. Однако, когда я хочу отключить Enceddedkafka, она застряла в петле, пытаясь воссоединиться в течение 60 секунд. Что я нахожу еще более странным, так это то, что в течение этих 60 секунд воссоединения моя запланированная задача CeckeVenthubConnection вообще не работает. Он работает только один раз в начале и один раз в конце, после 60 секунд. Я подозреваю, что это потому, что операция повторной попытки блокирует поток, где выполняется запланированная задача? Ниже приведена моя тестовая настройка и сам тест. < /P>
public class TestTest {
private EventHubHealthCheck eventHubHealthCheck;
private SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
private AdminClient adminClient;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private MockMvc mockMvc;
@BeforeAll
void beforeAll() {
eventHubHealthCheck = new EventHubHealthCheck(meterRegistry);
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
adminClient = AdminClient.create(props);
}
@AfterAll
void tearDown() {
if (adminClient != null) {
adminClient.close();
}
}
@Test
void shouldReportEventHubStatusDownWhenKafkaIsStopped() throws Exception {
embeddedKafka.destroy();
Thread.sleep(60000);
mockMvc.perform(get("/actuator/prometheus"))
.andExpect(status().isOk())
.andExpect(content().string(containsString("event_hub_connectivity_status 0.0")));
}
Подробнее здесь: https://stackoverflow.com/questions/794 ... ct-retries
Как остановить переподключение kafka Adminclient? ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение