Виртуальные потоки Java 21, похоже, блокируют потоки несущей при вызове внешней службыJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Виртуальные потоки Java 21, похоже, блокируют потоки несущей при вызове внешней службы

Сообщение Anonymous »

У меня есть микросервис Spring Boot, который принимает сообщения из RabbitMQ, составляет электронные письма и отправляет их на SMTP-сервер.
Он состоит из следующих компонентов:
  • Отправитель электронного письма, составляющий часть электронного письма, отправляет его на SMTP-сервер:

Код: Выделить всё

@Service
@RequiredArgsConstructor
public class MessageSender {

private final JavaMailSender sender;

public void sendMessage(final RabbitEmailDto emailDto) {
MimeMessage message = sender.createMimeMessage();
message.setRecipients(Message.RecipientType.TO, emailDto.getTo());

MimeMessageHelper helper = new MimeMessageHelper(message, CharEncoding.UTF_8);
helper.setSubject(emailDto.getData().getEmail().getSubject());
helper.setText(emailDto.getHtml(), true);
helper.setFrom(emailDto.getFrom());

sender.send(message);
}
}
  • Обработчик сообщений, который получает список сообщений RabbitMQ, для каждого из них вызывает отправителя сообщения в отдельном виртуальном потоке и возвращает список будущих результатов отправки писем

Код: Выделить всё

@Service
@RequiredArgsConstructor
public class MessageProcessor {

private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
private final MessageSender messageSender;
private final Jackson2JsonMessageConverter converter;

public List processMessages(List messages) {
List processingFutures = new ArrayList();

for (Message message : messages) {
MessageProcessingFuture messageProcessingFuture = new MessageProcessingFuture(
message.getMessageProperties().getDeliveryTag(),
processMessageAsync(message, executor)
);
processingFutures.add(messageProcessingFuture);
}

return processingFutures;
}

private Future processMessageAsync(final Message message) {
RabbitEmailDto rabbitEmailDto = (RabbitEmailDto) converter.fromMessage(message);
MessageDeliverySystemEmailDto email = rabbitEmailDto.getData().getEmail();

return executor.submit(() -> messageSender.sendMessage(rabbitEmailDto));
}
}
  • Прослушиватель сообщений RabbitMQ, который принимает сообщения из очереди Rabbit, передает их процессору, а затем обрабатывает фьючерсы, полученные от процессора. отправив подтверждение или отказ RabbitMQ в зависимости от того, создал ли Future.get() исключение или нет.

Код: Выделить всё

@Component
@RequiredArgsConstructor
public class BatchMessageListener implements ChannelAwareBatchMessageListener {

private final MessageProcessor messageProcessor;

@Override
@MeasureExecutionTime
public void onMessageBatch(final List messages, final Channel channel) {
messageProcessor.processMessages(messages)
.forEach(processingFuture -> processFuture(processingFuture, channel));
}

private void processFuture(final MessageProcessingFuture future, final Channel channel) {
try {
future.deliveryFuture().get();
channel.basicAck(future.deliveryTag(), false);
} catch (Exception e) {
channel.basicReject(future.deliveryTag(), false);
}
}
}
В журналах я вижу, что метод MessageSender.sendMessage действительно выполняется в виртуальном потоке, определенном как VirtualThread[#100]/runnable@ForkJoinPool-1-worker- 1.
И я вижу, что у меня есть 4 таких рабочих на нашем рабочем сервере. (Правильно ли я, что эти рабочие потоки являются реальными потоками платформы или потоками-носителями?)
Я также вижу, что методу MessageSender.sendMessage обычно требуется около 1 секунды. завершено, причем большая часть этого времени уходит на ожидание ответа от SMTP-сервера.
Основываясь на том, что я узнал о виртуальных потоках, я ожидал, что обработка пакета из 100 сообщений (это это мой настроенный размер пакета для BatchMessageListener) займет около 1 секунды, поскольку потоки платформы не будут блокировать вызовы SMTP-сервера. И эти 4 потока платформы будут совместно использоваться 100 виртуальными потоками, что фактически позволит осуществлять 100 почти одновременных вызовов SMTP-сервера.
Однако на практике я заметил, что сообщения обрабатываются 4 за раз, а обработка всех 100 сообщений занимает около 25 секунд.
Во время локального тестирования на моем компьютере я намеренно ввел задержку в 1 секунду, добавив Thread.sleep( 1000); перед строкой sender.send(message); в MessageSender для имитации задержки в сети. И тогда пакет из 100 сообщений действительно был обработан буквально за 1 секунду, несмотря на то, что по логам у меня было всего 10 потоков несущей.
Я в недоумении. Почему потоки-носители не блокируют вызов Thead.sleep, а блокируют вызов внешней службы? Я делаю что-то не так?

Подробнее здесь: https://stackoverflow.com/questions/783 ... rvice-call
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»