Исключение внутри TransactionTemplate в CompletableFuture.runAsync не достигает внешнего блока try/catchJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Исключение внутри TransactionTemplate в CompletableFuture.runAsync не достигает внешнего блока try/catch

Сообщение Anonymous »

Прошу прощения за незнание. Я только что закончил школу и пытаюсь создать (полагаю, используя клейкую ленту и клей) приложение Spring Boot для анализа больших файлов pcap, хранения метаданных файлов и анализируемых сообщений в PostgreSQL с использованием JPA/Hibernate. Метаданные каждого файла хранятся в таблице «pcap_files», и каждый файл может содержать тысячи проанализированных сообщений pcap в таблице «pcap_messages».
Поскольку это довольно новый проект, я хотел использовать Java 21 с виртуальными потоками, поскольку внутри приложения много операций ввода-вывода. Приложение может одновременно выполнять запросы на анализ очень больших файлов. Парсинг запускается асинхронно в соответствующем разделе:
@Service
//@RequiredArgsConstructor
public class PcapProcessingService{

private final Logger logger = LoggerFactory.getLogger(PcapProcessingService.class);

private static final int BATCH_SIZE = 1000;
private static final int FLUSH_CHUNK = 250;
private static final long POLL_TIMEOUT_MS = 500;

private final PcapMessageMapper pcapMessageMapper;
private final PcapParserFactory pcapParserFactory;

private final PcapFileRepository pcapFileRepository;

private final TaskExecutor vPcapConsumerExecutor;
private final PlatformTransactionManager txManager;

@PersistenceContext
private EntityManager em;

public PcapProcessingService(PcapMessageMapper pcapMessageMapper,
PcapParserFactory pcapParserFactory,
PcapFileRepository pcapFileRepository,
@Qualifier("vPcapConsumerExecutor") TaskExecutor vPcapConsumerExecutor,
PlatformTransactionManager txManager) {
this.pcapMessageMapper = pcapMessageMapper;
this.pcapParserFactory = pcapParserFactory;
this.pcapFileRepository = pcapFileRepository;
this.vPcapConsumerExecutor = vPcapConsumerExecutor;
this.txManager = txManager;
}

@Async("vTaskExecutor")
public void startParsingProcess(UUID fileUuid, String absolutePath){
logger.info("fileUuid: {} absolutePath: {}", fileUuid, absolutePath);

final PcapFile pcapFile = pcapFileRepository.findByFileUuid(fileUuid)
.orElseThrow(() -> new IllegalArgumentException("No file with uuid: " + fileUuid));
final Long fileId = pcapFile.getId();

markStarted(fileId);

PcapParser parser = pcapParserFactory.fromPath(absolutePath);

final long[] totalPacketsEmitted = {0L};
final long[] totalMessagesPersisted = {0L};

CompletableFuture producer = CompletableFuture.runAsync(() -> {
try{
parser.parsePcapFile();
} catch(Exception e){
throw new RuntimeException("PCAP producer failed", e);
}
}, vPcapConsumerExecutor);

CompletableFuture consumer = CompletableFuture.runAsync(() -> {

final List
buffer = new ArrayList(BATCH_SIZE);

try {
while(true){

if (Thread.currentThread().isInterrupted()){
throw new InterruptedException("Consumer interrupted");
}

PacketDetail detail = parser.getPackets().poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);

if(detail != null){
totalPacketsEmitted[0]++;

logger.info(" ");
logger.info("Detail: toString:{} messageList:{} packetType:{} payloadLength:{} isFragmented:{} nextSeqNum:{}", detail.toString(), detail.getMessageListInPacket(), detail.getPacketType(), detail.getPayloadLength(), detail.isFragmanted(), detail.getNextSequenceNum());

List messages = detail.getMessagePropertyListForParser();
if(messages!=null && !messages.isEmpty()){

for(MessagePropertyForParser mp : messages){

logger.info("Message property: time:{} srcIp:{} dstIp:{} srcPort:{} dstPort:{} rawMsg:{}", mp.getTime(), mp.getSourceIP(), mp.getDestIP(), mp.getSourcePort(), mp.getDestPort(), mp.getInputRawMsg());

PcapMessage entity = pcapMessageMapper.map(mp, detail.getPayloadLength(), detail.getPacketType());

buffer.add(entity);

if(buffer.size() >= BATCH_SIZE) {
logger.info("Persisting batch mid-parsing");
int persisted = persistBatch(fileId, buffer);

totalMessagesPersisted[0] += persisted;
buffer.clear();
}
}
}
}
if(parser.isParsingCompleted.get() && parser.getPackets().isEmpty()){
logger.info("Parsing completed");
break;
}
}
if(!buffer.isEmpty()){
logger.info("Proceeding to persisting");
int persisted = persistBatch(fileId, buffer);

totalMessagesPersisted[0] += persisted;
buffer.clear();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("PCAP consumer interrupted", e);
} catch (Exception e) {
Thread.currentThread().interrupt();
throw new RuntimeException("PCAP consumer unexpected exception", e);
}
}, vPcapConsumerExecutor);

try {
CompletableFuture.allOf(producer, consumer).join();

markCompleted(fileId, totalPacketsEmitted[0], totalMessagesPersisted[0]);

logger.info("Pcap parsing for fileUuid: {} completed, packets: {}, messages: {}", fileUuid, totalPacketsEmitted[0], totalMessagesPersisted[0]);
} catch (CompletionException ce){
Throwable cause = ce.getCause() != null ? ce.getCause() : ce;

logger.error("Pcap parsing for fileUuid: {} failed", fileUuid, cause);
markFailed(fileId, cause);

producer.cancel(true);
consumer.cancel(true);
}
}

private void markStarted(Long fileId) {
pcapFileRepository.findById(fileId).ifPresent(f -> {
f.setStartedAt(Instant.now());
f.setStatus(ProcessingStatus.PROCESSING);
f.setErrorMessage(null);
pcapFileRepository.save(f);
});
} //Other methods are similar

private void markCompleted(Long fileId, long totalPackets, totalClassified){/*set status=COMPLETED for fileId*/}
private void markFailed(Long fileId, Throwable t){/*set status=FAILED, errorMessage=..*/}

//see persistBatch method below

Я предоставляю такие темы:
@Bean(name = "vTaskExecutor")
public TaskExecutor vTaskExecutor(){
Executor vtp = Executors.newVirtualThreadPerTaskExecutor();
return new TaskExecutorAdapter(vtp);
}

Фактическая запись в БД выполняется в persistBatch с использованием TransactionTemplate и EntityManager для управления сбросом/очисткой больших пакетов.
private int persistBatch(final Long fileId, final List
batch) {
if(batch.isEmpty()) return 0;

TransactionTemplate tx = new TransactionTemplate(txManager);
tx.executeWithoutResult(status -> {
PcapFile managedFileRef = em.getReference(PcapFile.class, fileId);

logger.info("Managed file: {}", managedFileRef.getOriginalFilename());

int i = 0;
for (PcapMessage pm : batch) {
logger.info("Setting pcap file for message: {}", managedFileRef.getOriginalFilename());
pm.setPcapFile(managedFileRef);
logger.info("Persisting message");
em.persist(pm);
logger.info("Message persisted");

i++;
if (i % FLUSH_CHUNK == 0) {
logger.info("Flushing chunk");
em.flush();
em.clear();
logger.info("Completed flushing");

managedFileRef = em.getReference(PcapFile.class, fileId);
}
}
em.flush();
em.clear();
});
return batch.size();
}

Теперь реальная программа отлично работает с этими требованиями, если нет проблем с процессом пакетного сохранения. Однако, если что-то пойдет не так во время persistBatch (например, иногда mp.getInputRawMessage включает в себя нулевые символы), что приводит к ошибке сохранения в em.flush():
  • Я вижу исключение в журналах Hibernate
  • Никакие сообщения не сохраняются в базе данных, и это хорошо.
  • Но исключения в startParsingProcess, вероятно, не перехватываются, и markFailed не вызывается.
  • Строка pcap_files остается в состоянии PROCESSING без сообщений об ошибках.
Я пробовал обертывать методы сброса, tx.executeWithoutResult, вызовы методов persistBatch с блоками try/catch и повторно выбрасывать RuntimeExceptions, но я все еще не удалось перехватить исключение и сохранить файл как FAILED. Я явно неправильно понимаю настойчивость и темы, и я не мог делать то, что хотел. Конечно, я могу легко обнаружить нулевые символы в строке и заменить их, но я пытаюсь понять, что здесь происходит на самом деле.
Вот журналы ошибок:
Hibernate:
update
pcap_files
set
completed_at=?,
error_message=?,
is_replayable=?,
original_filename=?,
size_bytes=?,
started_at=?,
status=?,
stored_filename=?,
total_classified=?,
total_packets=?,
uploaded_at=?,
uploader_id=?
where
id=?
2025-11-19T14:49:01.498+03:00 INFO 27852 --- [ virtual-77] org.hibernate.orm.jdbc.batch : HHH100503: On release of batch it still contained JDBC statements
2025-11-19T14:49:01.498+03:00 WARN 27852 --- [ virtual-77] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: 22021
2025-11-19T14:49:01.498+03:00 ERROR 27852 --- [ virtual-77] o.h.engine.jdbc.spi.SqlExceptionHelper : Batch entry 0 update pcap_files set completed_at=('2025-11-19 11:49:01.489409+00'),error_message=('could not execute batch [Batch entry 0 insert into pcap_messages (...) values ...
Where: unnamed portal parameter $2 Call getNextException to see other errors in the batch.
2025-11-19T14:49:01.498+03:00 ERROR 27852 --- [ virtual-77] o.h.engine.jdbc.spi.SqlExceptionHelper : ERROR: invalid byte sequence for encoding "UTF8": 0x00
Where: unnamed portal parameter $2
2025-11-19T14:49:01.501+03:00 INFO 27852 --- [ virtual-77] i.StatisticalLoggingSessionEventListener : Session Metrics {
26701 nanoseconds spent acquiring 1 JDBC connections;
0 nanoseconds spent releasing 0 JDBC connections;
124101 nanoseconds spent preparing 2 JDBC statements;
1367899 nanoseconds spent executing 1 JDBC statements;
2154500 nanoseconds spent executing 1 JDBC batches;
0 nanoseconds spent performing 0 L2C puts;
0 nanoseconds spent performing 0 L2C hits;
0 nanoseconds spent performing 0 L2C misses;
4916201 nanoseconds spent executing 1 flushes (flushing a total of 1 entities and 0 collections);
0 nanoseconds spent executing 0 pre-partial-flushes;
0 nanoseconds spent executing 0 partial-flushes (flushing a total of 0 entities and 0 collection


Подробнее здесь: https://stackoverflow.com/questions/798 ... s-not-reac
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»