После их создания APP1 отправит событие в APP2, чтобы начать обработку этого автомобиля (непрерывная асинхронная задача), и время от времени APP2 также будет отправлять некоторые события тема автомобиля возвращается в APP1, чтобы сообщить о некоторых событиях в этом автомобиле. Поскольку в какой-то момент у APP2 закончится емкость, у меня будет дополнительный экземпляр, но я не уверен, как лучше всего управлять связью APP1 -> APP2 с несколькими экземплярами APP2, имеющими емкость (потому что в какой-то момент очень переполненный экземпляр может стать менее переполненным).
Потому что, если у меня есть все экземпляры в одной группе потребителей, только один получит сообщение о начале обработки автомобиля, а если у него нет емкость, как я могу найти другой экземпляр для выполнения задачи?
Еще одна идея, о которой я думаю, заключается в том, что когда я отправляю событие из APP1, чтобы иметь экземпляры APP2 у разных потребителей группы, поэтому каждый экземпляр получит событие для запуска машины обработки, но как убедиться, что его обрабатывает только один из них?
РЕДАКТИРОВАТЬ: добавлен код
Что у меня есть на данный момент:
создание двух тем для автомобиля, когда он создается в APP1
Код: Выделить всё
public void createCarTopics(String carName) {
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
String controlTopic = carName + "_control";
String detectionTopic = carName + "_detection";
int partitions = 1;
adminClient.createTopics(Arrays.asList(
new NewTopic(controlTopic, partitions, replicationFactor),
new NewTopic(detectionTopic, partitions, replicationFactor)
));
}
}
Код: Выделить всё
@Service
public class KafkaProducerImpl implements KafkaProducer {
private final KafkaTemplate kafkaTemplate;
@Override
public void sendCarControlEvent(CarControlEvent event) {
String carTopic = event.getCarName() + "_control"; // Single topic per car for APP11 -> APP2 communication
kafkaTemplate.send(carTopic, event);
}
}
Код: Выделить всё
@KafkaListener(topics = "#{@kafkaConsumerConfig.controlTopicForCar('carName')}", groupId = "${spring.kafka.consumer.group-id}")
public void listenCarControl(String messageEvent) {
try {
CarControlEvent event = objectMapper.readValue(messageEvent, CarControlEvent.class);
switch (event.getActionType()) {
case START_CAR_PROCESSING:
startCar(event);
break;
case STOP_CAR_PROCESSING:
stopCar(event);
break;
}
} catch (JsonProcessingException e) {
}
}
@Component
public class KafkaConsumerConfig {
public String controlTopicForCar(String carName) {
return carName + "_control";
}
public String detectionTopicForCar(String carName) {
return carName + "_detection";
}
}
Я открыт для другой стратегии реализации, если есть что-то, что поможет мне обработать автомобили в несколько экземпляров.
Подробнее здесь: https://stackoverflow.com/questions/791 ... ka-message
Мобильная версия