- PostgreSQL запускается через Docker-контейнер
- Project Loom
- Spring Boot 3.3.4
- Java 21
- Spring Boot Starter JDBC (который включает HikariCp - которого я считаю источником проблемы)
- Spring Boot Starter JPA
Основная проблема заключается в том, что при выполнении одновременных операций на высоких скоростях я сталкиваюсь с проблемами ввода-вывода с HikariCp при попытке сохранить элементы в моя база данных. Я не совсем уверен, что делать.
В журналах моего микросервиса:
Код: Выделить всё
2024-09-19T18:50:51.617-04:00 WARN 10060 --- [virtual-ingest] [ virtual-342] com.zaxxer.hikari.pool.ProxyConnection : HikariPool-1 - Connection org.postgresql.jdbc.PgConnection@28b26c3f marked as broken because of SQLSTATE(08006), ErrorCode(0)
org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
**
Код: Выделить всё
2024-09-19 22:50:51.807 UTC [412] LOG: unexpected EOF on client connection with an open transaction
Код: Выделить всё
@EventListener(ApplicationReadyEvent.class)
public void startStructures()
{
try (var outerScope = new StructuredTaskScope.ShutdownOnFailure())
{
sqsConfig.getQueues().forEach(queue -> outerScope.fork(() -> {
processQueueMessages(queue);
return null;
}));
outerScope.join();
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.error("Processing interrupted", e);
}
}
private void processQueueMessages(String queue)
{
while (true)
{
try (var innerScope = new StructuredTaskScope.ShutdownOnFailure())
{
List messages = retrieveMessages(queue);
if (messages.isEmpty())
{
log.info("No messages in queue: {}, sleeping...", queue);
Thread.sleep(5000);
}
else
{
for (Message message : messages)
{
innerScope.fork(() -> {
processMessage(queue, message);
return null;
});
}
innerScope.join();
}
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.error("Queue processing interrupted", e);
break;
}
}
}
public List retrieveMessages(String queue)
{
var url = awsConfig.getBaseUrl() + queue;
ReceiveMessageRequest request = ReceiveMessageRequest.builder()
.queueUrl(url)
.maxNumberOfMessages(10)
.waitTimeSeconds(
10)
.build();
ReceiveMessageResponse response = sqsClient.receiveMessage(request);
return response.messages();
}
private void processMessage(String queue, Message message) throws JsonProcessingException
{
log.info("Processing message from queue {}: {}", queue, message.messageId());
JsonNode node = nodeBuilderService.buildNode(message);
distributionService.handleSqsNotification(node);
deleteMessage(queue, message);
}
private void deleteMessage(String queue, Message message)
{
var url = awsConfig.getBaseUrl() + queue;
sqsClient.deleteMessage(builder -> builder.queueUrl(url).receiptHandle(message.receiptHandle()));
int remainingMessages = getMessageCount(queue);
log.info(
"Deleted message: {}. Approximate {} messages remaining in the queue.", message.messageId(),
remainingMessages
);
}
public int getMessageCount(String queueName)
{
var url = awsConfig.getBaseUrl() + queueName;
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
.queueUrl(url)
.attributeNames(
QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
.build();
return Integer.parseInt(
sqsClient.getQueueAttributes(request)
.attributes()
.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
);
}
Код: Выделить всё
@Transactional
public void process(String filename, InputStream stream)
throws IOException
{
byte[] bytes = getBytesFromInputStream(stream, filename);
List observations = decoder.beginSynopticDecoders(bytes, filename);
repository.saveAllAndFlush(observations);
}
Я пытался принудительно перенести это в поток платформы, но считаю, что это связано с отсутствием поддержки HikariCP. для виртуальных потоков, даже в текущих версиях. Я наблюдал за кафе JEP в свои телескопы и хотел бы знать, сможет ли кто-нибудь помочь мне ответить на несколько вопросов.
- Делаю ли я что-то явно неправильно? Я перехожу с асинхронных шаблонов и хочу никогда больше их не использовать, поэтому велика вероятность, что я не знаю, что мне делать.
- Мне интересно, может ли ExtentLocal помочь . Я думаю, что проблема в Hikari может быть связана с использованием ThreadLocal. но опять же, я новичок в этом деле.
- Нужно ли мне отказаться от Spring jdbc? Что бы я использовал вместо этого? На момент написания этой статьи у R2DBC (я почти уверен) все еще не было хорошей системы ORM, даже с реактивным режимом Hibernate.
Подробнее здесь: https://stackoverflow.com/questions/790 ... ng-boot-jd
Мобильная версия