О, нет! ЙО! Проблемы параллелизма с виртуальными потоками, HikariCP и Spring Boot JDBC (Postgres)JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 О, нет! ЙО! Проблемы параллелизма с виртуальными потоками, HikariCP и Spring Boot JDBC (Postgres)

Сообщение Anonymous »

Я столкнулся с проблемой при использовании следующих технологий.
  • PostgreSQL запускается через Docker-контейнер
  • Project Loom
  • Spring Boot 3.3.4
  • Java 21
  • Spring Boot Starter JDBC (который включает HikariCp - которого я считаю источником проблемы)
  • Spring Boot Starter JPA
И, очевидно, перечень других вещей. По запросу могу предоставить POM, но не думаю, что это будет полезно. Мой драйвер postgres — UTD, как и все мои версии.
Основная проблема заключается в том, что при выполнении одновременных операций на высоких скоростях я сталкиваюсь с проблемами ввода-вывода с HikariCp при попытке сохранить элементы в моя база данных. Я не совсем уверен, что делать.
В журналах моего микросервиса:

Код: Выделить всё

2024-09-19T18:50:51.617-04:00  WARN 10060 --- [virtual-ingest] [    virtual-342] com.zaxxer.hikari.pool.ProxyConnection   : HikariPool-1 - Connection org.postgresql.jdbc.PgConnection@28b26c3f marked as broken because of SQLSTATE(08006), ErrorCode(0)

org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
**В журналах Docker для PG (постоянно повторяется):
**

Код: Выделить всё

2024-09-19 22:50:51.807 UTC [412] LOG:  unexpected EOF on client connection with an open transaction

Мой код (я еще не возвращался, чтобы его почистить):

Код: Выделить всё

@EventListener(ApplicationReadyEvent.class)
public void startStructures()
{
try (var outerScope = new StructuredTaskScope.ShutdownOnFailure())
{
sqsConfig.getQueues().forEach(queue -> outerScope.fork(() -> {
processQueueMessages(queue);
return null;
}));
outerScope.join();
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.error("Processing interrupted", e);
}
}

private void processQueueMessages(String queue)
{
while (true)
{
try (var innerScope = new StructuredTaskScope.ShutdownOnFailure())
{
List messages = retrieveMessages(queue);
if (messages.isEmpty())
{
log.info("No messages in queue: {}, sleeping...", queue);
Thread.sleep(5000);
}
else
{
for (Message message : messages)
{
innerScope.fork(() -> {
processMessage(queue, message);
return null;
});
}
innerScope.join();
}
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.error("Queue processing interrupted", e);
break;
}
}
}

public List retrieveMessages(String queue)
{
var url = awsConfig.getBaseUrl() + queue;
ReceiveMessageRequest request = ReceiveMessageRequest.builder()
.queueUrl(url)
.maxNumberOfMessages(10)
.waitTimeSeconds(
10)
.build();

ReceiveMessageResponse response = sqsClient.receiveMessage(request);
return response.messages();
}

private void processMessage(String queue, Message message) throws JsonProcessingException
{

log.info("Processing message from queue {}: {}", queue, message.messageId());

JsonNode node = nodeBuilderService.buildNode(message);
distributionService.handleSqsNotification(node);
deleteMessage(queue, message);
}

private void deleteMessage(String queue, Message message)
{
var url = awsConfig.getBaseUrl() + queue;
sqsClient.deleteMessage(builder -> builder.queueUrl(url).receiptHandle(message.receiptHandle()));

int remainingMessages = getMessageCount(queue);
log.info(
"Deleted message: {}.  Approximate {} messages remaining in the queue.", message.messageId(),
remainingMessages
);
}

public int getMessageCount(String queueName)
{
var url = awsConfig.getBaseUrl() + queueName;

GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
.queueUrl(url)
.attributeNames(
QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
.build();

return Integer.parseInt(
sqsClient.getQueueAttributes(request)
.attributes()
.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
);
}
При обработке сообщений мой код в конечном итоге обращается к этому методу:

Код: Выделить всё

@Transactional
public void process(String filename, InputStream stream)
throws IOException
{
byte[] bytes = getBytesFromInputStream(stream, filename);

List observations = decoder.beginSynopticDecoders(bytes, filename);
repository.saveAllAndFlush(observations);
}
Именно здесь возникает ошибка.
Я пытался принудительно перенести это в поток платформы, но считаю, что это связано с отсутствием поддержки HikariCP. для виртуальных потоков, даже в текущих версиях. Я наблюдал за кафе JEP в свои телескопы и хотел бы знать, сможет ли кто-нибудь помочь мне ответить на несколько вопросов.
  • Делаю ли я что-то явно неправильно? Я перехожу с асинхронных шаблонов и хочу никогда больше их не использовать, поэтому велика вероятность, что я не знаю, что мне делать.
  • Мне интересно, может ли ExtentLocal помочь . Я думаю, что проблема в Hikari может быть связана с использованием ThreadLocal. но опять же, я новичок в этом деле.
  • Нужно ли мне отказаться от Spring jdbc? Что бы я использовал вместо этого? На момент написания этой статьи у R2DBC (я почти уверен) все еще не было хорошей системы ORM, даже с реактивным режимом Hibernate.
Я пробовал потоки платформы, Я попробовал область только для транзакций БД. Я пробовал много вещей часами, и у меня такой кодовый мозг, что теперь я даже не уверен, что я пробовал. Я даже пытался использовать ChatGPT, но, как и ожидалось, это не помогло.

Подробнее здесь: https://stackoverflow.com/questions/790 ... ng-boot-jd
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»