Он состоит из следующих компонентов:
- Отправитель электронного письма, составляющий часть электронного письма, отправляет его на SMTP-сервер:
Код: Выделить всё
@Service
@RequiredArgsConstructor
public class MessageSender {
private final JavaMailSender sender;
public void sendMessage(final RabbitEmailDto emailDto) {
MimeMessage message = sender.createMimeMessage();
message.setRecipients(Message.RecipientType.TO, emailDto.getTo());
MimeMessageHelper helper = new MimeMessageHelper(message, CharEncoding.UTF_8);
helper.setSubject(emailDto.getData().getEmail().getSubject());
helper.setText(emailDto.getHtml(), true);
helper.setFrom(emailDto.getFrom());
sender.send(message);
}
}
- Обработчик сообщений, который получает список сообщений RabbitMQ, для каждого из них вызывает отправителя сообщения в отдельном виртуальном потоке и возвращает список будущих результатов отправки писем
Код: Выделить всё
@Service
@RequiredArgsConstructor
public class MessageProcessor {
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
private final MessageSender messageSender;
private final Jackson2JsonMessageConverter converter;
public List processMessages(List messages) {
List processingFutures = new ArrayList();
for (Message message : messages) {
MessageProcessingFuture messageProcessingFuture = new MessageProcessingFuture(
message.getMessageProperties().getDeliveryTag(),
processMessageAsync(message, executor)
);
processingFutures.add(messageProcessingFuture);
}
return processingFutures;
}
private Future processMessageAsync(final Message message) {
RabbitEmailDto rabbitEmailDto = (RabbitEmailDto) converter.fromMessage(message);
MessageDeliverySystemEmailDto email = rabbitEmailDto.getData().getEmail();
return executor.submit(() -> messageSender.sendMessage(rabbitEmailDto));
}
}
- Прослушиватель сообщений RabbitMQ, который принимает сообщения из очереди Rabbit, передает их процессору, а затем обрабатывает фьючерсы, полученные от процессора. отправив подтверждение или отказ RabbitMQ в зависимости от того, создал ли Future.get() исключение или нет.
Код: Выделить всё
@Component
@RequiredArgsConstructor
public class BatchMessageListener implements ChannelAwareBatchMessageListener {
private final MessageProcessor messageProcessor;
@Override
@MeasureExecutionTime
public void onMessageBatch(final List messages, final Channel channel) {
messageProcessor.processMessages(messages)
.forEach(processingFuture -> processFuture(processingFuture, channel));
}
private void processFuture(final MessageProcessingFuture future, final Channel channel) {
try {
future.deliveryFuture().get();
channel.basicAck(future.deliveryTag(), false);
} catch (Exception e) {
channel.basicReject(future.deliveryTag(), false);
}
}
}
И я вижу, что у меня есть 4 таких рабочих на нашем рабочем сервере. (Правильно ли я, что эти рабочие потоки являются реальными потоками платформы или потоками-носителями?)
Я также вижу, что методу MessageSender.sendMessage обычно требуется около 1 секунды. завершено, причем большая часть этого времени уходит на ожидание ответа от SMTP-сервера.
Основываясь на том, что я узнал о виртуальных потоках, я ожидал, что обработка пакета из 100 сообщений (это это мой настроенный размер пакета для BatchMessageListener) займет около 1 секунды, поскольку потоки платформы не будут блокировать вызовы SMTP-сервера. И эти 4 потока платформы будут совместно использоваться 100 виртуальными потоками, что фактически позволит осуществлять 100 почти одновременных вызовов SMTP-сервера.
Однако на практике я заметил, что сообщения обрабатываются 4 за раз, а обработка всех 100 сообщений занимает около 25 секунд.
Во время локального тестирования на моем компьютере я намеренно ввел задержку в 1 секунду, добавив Thread.sleep( 1000); перед строкой sender.send(message); в MessageSender для имитации задержки в сети. И тогда пакет из 100 сообщений действительно был обработан буквально за 1 секунду, несмотря на то, что по логам у меня было всего 10 потоков несущей.
Я в недоумении. Почему потоки-носители не блокируют вызов Thead.sleep, а блокируют вызов внешней службы? Я делаю что-то не так?
Подробнее здесь: https://stackoverflow.com/questions/783 ... rvice-call