Как остановить переподключение kafka Adminclient?JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как остановить переподключение kafka Adminclient?

Сообщение Anonymous »

Я пытаюсь написать проверку здоровья на Кафку, которая периодически работает, проверяет связь и сообщаю о любых проблемах с помощью оповещений Прометея. Моя идея о том, как это сделать, заключалась в том, чтобы просто сделать запрос на Кафку в блоке Try Catch и рассматривать любое исключение как потерянное подключение, код ниже. < /P>
@Scheduled(fixedRate = 1000)
private void checkEventHubConnection() {
try {
System.out.println("Checking eventHub connection...");
adminClient.listTopics().names().get();
eventHubStatus = 1;
} catch (ExecutionException ee) {
eventHubStatusDown(ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
eventHubStatusDown(ie);
}
}
< /code>
Моя проблема с модульными тестами. Проверка положительного случая достаточно проста и работает. Однако, когда я хочу отключить Enceddedkafka, она застряла в петле, пытаясь воссоединиться в течение 60 секунд. Что я нахожу еще более странным, так это то, что в течение этих 60 секунд воссоединения моя запланированная задача CeckeVenthubConnection вообще не работает. Он работает только один раз в начале и один раз в конце, после 60 секунд. Я подозреваю, что это потому, что операция повторной попытки блокирует поток, где выполняется запланированная задача? Ниже приведена моя тестовая настройка и сам тест. < /P>
public class TestTest {

private EventHubHealthCheck eventHubHealthCheck;
private SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
private AdminClient adminClient;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private MockMvc mockMvc;

@BeforeAll
void beforeAll() {
eventHubHealthCheck = new EventHubHealthCheck(meterRegistry);
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
adminClient = AdminClient.create(props);
}

@AfterAll
void tearDown() {
if (adminClient != null) {
adminClient.close();
}
}

@Test
void shouldReportEventHubStatusDownWhenKafkaIsStopped() throws Exception {
embeddedKafka.destroy();

Thread.sleep(60000);

mockMvc.perform(get("/actuator/prometheus"))
.andExpect(status().isOk())
.andExpect(content().string(containsString("event_hub_connectivity_status 0.0")));
}


Подробнее здесь: https://stackoverflow.com/questions/794 ... ct-retries
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»