Найдите другой экземпляр для обработки сообщения Кафки.JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Найдите другой экземпляр для обработки сообщения Кафки.

Сообщение Anonymous »

У меня есть 2 приложения SpringBoot, развернутые в AWS, APP1 управляет автомобилями, и для каждого нового добавленного автомобиля оно создает 2 темы Kafka, которые будут использоваться только для этого автомобиля (тема A для событий APP1 -> APP2 и тема B для событий APP2 -> APP1).
После их создания APP1 отправит событие в APP2, чтобы начать обработку этого автомобиля (непрерывная асинхронная задача), и время от времени APP2 также будет отправлять некоторые события тема автомобиля возвращается в APP1, чтобы сообщить о некоторых событиях в этом автомобиле. Поскольку в какой-то момент у APP2 закончится емкость, у меня будет дополнительный экземпляр, но я не уверен, как лучше всего управлять связью APP1 -> APP2 с несколькими экземплярами APP2, имеющими емкость (потому что в какой-то момент очень переполненный экземпляр может стать менее переполненным).
Потому что, если у меня есть все экземпляры в одной группе потребителей, только один получит сообщение о начале обработки автомобиля, а если у него нет емкость, как я могу найти другой экземпляр для выполнения задачи?
Еще одна идея, о которой я думаю, заключается в том, что когда я отправляю событие из APP1, чтобы иметь экземпляры APP2 у разных потребителей группы, поэтому каждый экземпляр получит событие для запуска машины обработки, но как убедиться, что его обрабатывает только один из них?
РЕДАКТИРОВАТЬ: добавлен код
Что у меня есть на данный момент:
создание двух тем для автомобиля, когда он создается в APP1

Код: Выделить всё

  public void createCarTopics(String carName) {
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
String controlTopic = carName + "_control";
String detectionTopic = carName + "_detection";
int partitions = 1;

adminClient.createTopics(Arrays.asList(
new NewTopic(controlTopic, partitions, replicationFactor),
new NewTopic(detectionTopic, partitions, replicationFactor)
));
}
}

Код: Выделить всё

@Service
public class KafkaProducerImpl implements KafkaProducer {

private final KafkaTemplate kafkaTemplate;

@Override
public void sendCarControlEvent(CarControlEvent event) {
String carTopic = event.getCarName() + "_control"; // Single topic per car for APP11 -> APP2 communication
kafkaTemplate.send(carTopic, event);
}
}
А внутри APP2 я слушаю вот так

Код: Выделить всё

@KafkaListener(topics = "#{@kafkaConsumerConfig.controlTopicForCar('carName')}", groupId = "${spring.kafka.consumer.group-id}")
public void listenCarControl(String messageEvent) {
try {
CarControlEvent event = objectMapper.readValue(messageEvent, CarControlEvent.class);

switch (event.getActionType()) {
case START_CAR_PROCESSING:
startCar(event);
break;
case STOP_CAR_PROCESSING:
stopCar(event);
break;
}
} catch (JsonProcessingException e) {
}
}

@Component
public class KafkaConsumerConfig {

public String controlTopicForCar(String carName) {
return carName + "_control";
}

public String detectionTopicForCar(String carName) {
return carName + "_detection";
}
}
В этом подходе у меня есть один раздел и все потребители в одной группе, поэтому при наличии нескольких экземпляров мы получаем управляющие сообщения в одном потребителе. Но проблема в том, что произойдет, если у меня не будет мощности для обработки этого экземпляра и нам придется искать другой экземпляр?
Я открыт для другой стратегии реализации, если есть что-то, что поможет мне обработать автомобили в несколько экземпляров.

Подробнее здесь: https://stackoverflow.com/questions/791 ... ka-message
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»