Кажется, что виртуальные потоки блокируют потоки несущей при вызове внешней службы.JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Кажется, что виртуальные потоки блокируют потоки несущей при вызове внешней службы.

Сообщение Anonymous »

У меня есть микросервис Spring Boot, который принимает сообщения из RabbitMQ, составляет электронные письма и отправляет их на SMTP-сервер.
Он состоит из следующих компонентов:
  • Отправитель электронного письма, составляющий часть электронного письма, отправляет его на SMTP-сервер:

Код: Выделить всё

@Service
@RequiredArgsConstructor
public class MessageSender {

private final JavaMailSender sender;

public void sendMessage(final RabbitEmailDto emailDto) {
MimeMessage message = sender.createMimeMessage();
message.setRecipients(Message.RecipientType.TO, emailDto.getTo());

MimeMessageHelper helper = new MimeMessageHelper(message, CharEncoding.UTF_8);
helper.setSubject(emailDto.getData().getEmail().getSubject());
helper.setText(emailDto.getHtml(), true);
helper.setFrom(emailDto.getFrom());

sender.send(message);
}
}
  • Обработчик сообщений, который получает список сообщений RabbitMQ, для каждого из них вызывает отправителя сообщения в отдельном виртуальном потоке и возвращает список будущих результатов отправки писем

Код: Выделить всё

@Service
@RequiredArgsConstructor
public class MessageProcessor {

private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
private final MessageSender messageSender;
private final Jackson2JsonMessageConverter converter;

public List processMessages(List messages) {
List processingFutures = new ArrayList();

for (Message message : messages) {
MessageProcessingFuture messageProcessingFuture = new MessageProcessingFuture(
message.getMessageProperties().getDeliveryTag(),
processMessageAsync(message, executor)
);
processingFutures.add(messageProcessingFuture);
}

return processingFutures;
}

private Future processMessageAsync(final Message message) {
RabbitEmailDto rabbitEmailDto = (RabbitEmailDto) converter.fromMessage(message);
MessageDeliverySystemEmailDto email = rabbitEmailDto.getData().getEmail();

return executor.submit(() -> messageSender.sendMessage(rabbitEmailDto));
}
}
  • Прослушиватель сообщений RabbitMQ, который принимает сообщения из очереди Rabbit, передает их процессору, а затем обрабатывает фьючерсы, полученные от процессора. отправив подтверждение или отказ RabbitMQ в зависимости от того, создал ли Future.get() исключение или нет.

Код: Выделить всё

@Component
@RequiredArgsConstructor
public class BatchMessageListener implements ChannelAwareBatchMessageListener {

private final MessageProcessor messageProcessor;

@Override
@MeasureExecutionTime
public void onMessageBatch(final List messages, final Channel channel) {
messageProcessor.processMessages(messages)
.forEach(processingFuture -> processFuture(processingFuture, channel));
}

private void processFuture(final MessageProcessingFuture future, final Channel channel) {
try {
future.deliveryFuture().get();
channel.basicAck(future.deliveryTag(), false);
} catch (Exception e) {
channel.basicReject(future.deliveryTag(), false);
}
}
}
В журналах я вижу, что метод MessageSender.sendMessage действительно выполняется в виртуальном потоке, определенном как VirtualThread[#100]/runnable@ForkJoinPool-1-worker- 1.
И я вижу, что у меня есть 4 таких рабочих на нашем рабочем сервере. (Правильно ли я, что эти рабочие потоки являются реальными потоками платформы или потоками-носителями?)
Я также вижу, что методу MessageSender.sendMessage обычно требуется около 1 секунды. завершено, причем большая часть этого времени уходит на ожидание ответа от SMTP-сервера.
Основываясь на том, что я узнал о виртуальных потоках, я ожидал, что обработка пакета из 100 сообщений (это это мой настроенный размер пакета для BatchMessageListener) займет около 1 секунды, поскольку потоки платформы не будут блокировать вызовы SMTP-сервера. И эти 4 потока платформы будут совместно использоваться 100 виртуальными потоками, что фактически позволит осуществлять 100 почти одновременных вызовов SMTP-сервера.
Однако на практике я заметил, что сообщения обрабатываются 4 за раз, а обработка всех 100 сообщений занимает около 25 секунд.
Во время локального тестирования на моем компьютере я намеренно ввел задержку в 1 секунду, добавив Thread.sleep( 1000); перед строкой sender.send(message); в MessageSender для имитации задержки в сети. И тогда пакет из 100 сообщений действительно был обработан буквально за 1 секунду, несмотря на то, что по логам у меня было всего 10 потоков несущей.
Я в недоумении. Почему потоки-носители не блокируют вызов Thead.sleep, а блокируют вызов внешней службы? Я делаю что-то не так?

Подробнее здесь: https://stackoverflow.com/questions/783 ... rvice-call
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»