Поскольку это довольно новый проект, я хотел использовать Java 21 с виртуальными потоками, поскольку внутри приложения много операций ввода-вывода. Приложение может одновременно выполнять запросы на анализ очень больших файлов. Парсинг запускается асинхронно в соответствующем разделе:
@Service
//@RequiredArgsConstructor
public class PcapProcessingService{
private final Logger logger = LoggerFactory.getLogger(PcapProcessingService.class);
private static final int BATCH_SIZE = 1000;
private static final int FLUSH_CHUNK = 250;
private static final long POLL_TIMEOUT_MS = 500;
private final PcapMessageMapper pcapMessageMapper;
private final PcapParserFactory pcapParserFactory;
private final PcapFileRepository pcapFileRepository;
private final TaskExecutor vPcapConsumerExecutor;
private final PlatformTransactionManager txManager;
@PersistenceContext
private EntityManager em;
public PcapProcessingService(PcapMessageMapper pcapMessageMapper,
PcapParserFactory pcapParserFactory,
PcapFileRepository pcapFileRepository,
@Qualifier("vPcapConsumerExecutor") TaskExecutor vPcapConsumerExecutor,
PlatformTransactionManager txManager) {
this.pcapMessageMapper = pcapMessageMapper;
this.pcapParserFactory = pcapParserFactory;
this.pcapFileRepository = pcapFileRepository;
this.vPcapConsumerExecutor = vPcapConsumerExecutor;
this.txManager = txManager;
}
@Async("vTaskExecutor")
public void startParsingProcess(UUID fileUuid, String absolutePath){
logger.info("fileUuid: {} absolutePath: {}", fileUuid, absolutePath);
final PcapFile pcapFile = pcapFileRepository.findByFileUuid(fileUuid)
.orElseThrow(() -> new IllegalArgumentException("No file with uuid: " + fileUuid));
final Long fileId = pcapFile.getId();
markStarted(fileId);
PcapParser parser = pcapParserFactory.fromPath(absolutePath);
final long[] totalPacketsEmitted = {0L};
final long[] totalMessagesPersisted = {0L};
CompletableFuture producer = CompletableFuture.runAsync(() -> {
try{
parser.parsePcapFile();
} catch(Exception e){
throw new RuntimeException("PCAP producer failed", e);
}
}, vPcapConsumerExecutor);
CompletableFuture consumer = CompletableFuture.runAsync(() -> {
final List
buffer = new ArrayList(BATCH_SIZE);
try {
while(true){
if (Thread.currentThread().isInterrupted()){
throw new InterruptedException("Consumer interrupted");
}
PacketDetail detail = parser.getPackets().poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if(detail != null){
totalPacketsEmitted[0]++;
logger.info(" ");
logger.info("Detail: toString:{} messageList:{} packetType:{} payloadLength:{} isFragmented:{} nextSeqNum:{}", detail.toString(), detail.getMessageListInPacket(), detail.getPacketType(), detail.getPayloadLength(), detail.isFragmanted(), detail.getNextSequenceNum());
List messages = detail.getMessagePropertyListForParser();
if(messages!=null && !messages.isEmpty()){
for(MessagePropertyForParser mp : messages){
logger.info("Message property: time:{} srcIp:{} dstIp:{} srcPort:{} dstPort:{} rawMsg:{}", mp.getTime(), mp.getSourceIP(), mp.getDestIP(), mp.getSourcePort(), mp.getDestPort(), mp.getInputRawMsg());
PcapMessage entity = pcapMessageMapper.map(mp, detail.getPayloadLength(), detail.getPacketType());
buffer.add(entity);
if(buffer.size() >= BATCH_SIZE) {
logger.info("Persisting batch mid-parsing");
int persisted = persistBatch(fileId, buffer);
totalMessagesPersisted[0] += persisted;
buffer.clear();
}
}
}
}
if(parser.isParsingCompleted.get() && parser.getPackets().isEmpty()){
logger.info("Parsing completed");
break;
}
}
if(!buffer.isEmpty()){
logger.info("Proceeding to persisting");
int persisted = persistBatch(fileId, buffer);
totalMessagesPersisted[0] += persisted;
buffer.clear();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("PCAP consumer interrupted", e);
} catch (Exception e) {
Thread.currentThread().interrupt();
throw new RuntimeException("PCAP consumer unexpected exception", e);
}
}, vPcapConsumerExecutor);
try {
CompletableFuture.allOf(producer, consumer).join();
markCompleted(fileId, totalPacketsEmitted[0], totalMessagesPersisted[0]);
logger.info("Pcap parsing for fileUuid: {} completed, packets: {}, messages: {}", fileUuid, totalPacketsEmitted[0], totalMessagesPersisted[0]);
} catch (CompletionException ce){
Throwable cause = ce.getCause() != null ? ce.getCause() : ce;
logger.error("Pcap parsing for fileUuid: {} failed", fileUuid, cause);
markFailed(fileId, cause);
producer.cancel(true);
consumer.cancel(true);
}
}
private void markStarted(Long fileId) {
pcapFileRepository.findById(fileId).ifPresent(f -> {
f.setStartedAt(Instant.now());
f.setStatus(ProcessingStatus.PROCESSING);
f.setErrorMessage(null);
pcapFileRepository.save(f);
});
} //Other methods are similar
private void markCompleted(Long fileId, long totalPackets, totalClassified){/*set status=COMPLETED for fileId*/}
private void markFailed(Long fileId, Throwable t){/*set status=FAILED, errorMessage=..*/}
//see persistBatch method below
Я предоставляю такие темы:
@Bean(name = "vTaskExecutor")
public TaskExecutor vTaskExecutor(){
Executor vtp = Executors.newVirtualThreadPerTaskExecutor();
return new TaskExecutorAdapter(vtp);
}
Фактическая запись в БД выполняется в persistBatch с использованием TransactionTemplate и EntityManager для управления сбросом/очисткой больших пакетов.
private int persistBatch(final Long fileId, final List
batch) {
if(batch.isEmpty()) return 0;
TransactionTemplate tx = new TransactionTemplate(txManager);
tx.executeWithoutResult(status -> {
PcapFile managedFileRef = em.getReference(PcapFile.class, fileId);
logger.info("Managed file: {}", managedFileRef.getOriginalFilename());
int i = 0;
for (PcapMessage pm : batch) {
logger.info("Setting pcap file for message: {}", managedFileRef.getOriginalFilename());
pm.setPcapFile(managedFileRef);
logger.info("Persisting message");
em.persist(pm);
logger.info("Message persisted");
i++;
if (i % FLUSH_CHUNK == 0) {
logger.info("Flushing chunk");
em.flush();
em.clear();
logger.info("Completed flushing");
managedFileRef = em.getReference(PcapFile.class, fileId);
}
}
em.flush();
em.clear();
});
return batch.size();
}
Теперь реальная программа отлично работает с этими требованиями, если нет проблем с процессом пакетного сохранения. Однако, если что-то пойдет не так во время persistBatch (например, иногда mp.getInputRawMessage включает в себя нулевые символы), что приводит к ошибке сохранения в em.flush():
- Я вижу исключение в журналах Hibernate
- Никакие сообщения не сохраняются в базе данных, и это хорошо.
- Но исключения в startParsingProcess, вероятно, не перехватываются, и markFailed не вызывается.
- Строка pcap_files остается в состоянии PROCESSING без сообщений об ошибках.
Вот журналы ошибок:
Hibernate:
update
pcap_files
set
completed_at=?,
error_message=?,
is_replayable=?,
original_filename=?,
size_bytes=?,
started_at=?,
status=?,
stored_filename=?,
total_classified=?,
total_packets=?,
uploaded_at=?,
uploader_id=?
where
id=?
2025-11-19T14:49:01.498+03:00 INFO 27852 --- [ virtual-77] org.hibernate.orm.jdbc.batch : HHH100503: On release of batch it still contained JDBC statements
2025-11-19T14:49:01.498+03:00 WARN 27852 --- [ virtual-77] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: 22021
2025-11-19T14:49:01.498+03:00 ERROR 27852 --- [ virtual-77] o.h.engine.jdbc.spi.SqlExceptionHelper : Batch entry 0 update pcap_files set completed_at=('2025-11-19 11:49:01.489409+00'),error_message=('could not execute batch [Batch entry 0 insert into pcap_messages (...) values ...
Where: unnamed portal parameter $2 Call getNextException to see other errors in the batch.
2025-11-19T14:49:01.498+03:00 ERROR 27852 --- [ virtual-77] o.h.engine.jdbc.spi.SqlExceptionHelper : ERROR: invalid byte sequence for encoding "UTF8": 0x00
Where: unnamed portal parameter $2
2025-11-19T14:49:01.501+03:00 INFO 27852 --- [ virtual-77] i.StatisticalLoggingSessionEventListener : Session Metrics {
26701 nanoseconds spent acquiring 1 JDBC connections;
0 nanoseconds spent releasing 0 JDBC connections;
124101 nanoseconds spent preparing 2 JDBC statements;
1367899 nanoseconds spent executing 1 JDBC statements;
2154500 nanoseconds spent executing 1 JDBC batches;
0 nanoseconds spent performing 0 L2C puts;
0 nanoseconds spent performing 0 L2C hits;
0 nanoseconds spent performing 0 L2C misses;
4916201 nanoseconds spent executing 1 flushes (flushing a total of 1 entities and 0 collections);
0 nanoseconds spent executing 0 pre-partial-flushes;
0 nanoseconds spent executing 0 partial-flushes (flushing a total of 0 entities and 0 collection
Подробнее здесь: https://stackoverflow.com/questions/798 ... s-not-reac
Мобильная версия