У меня есть служба, которая опрашивает базу данных в рамках транзакции, всегда удаляя запись, а если во время обработки выдается исключение, добавляет новую запись, увеличивая «счетчик попыток» для этой записи. Это выглядит примерно так:
@Log4j2
@Profile("simple")
@EnableScheduling
@Component
@RequiredArgsConstructor
public class SimplePoller {
@Value("${outputDir}")
private final String outputDir;
private final NamedParameterJdbcTemplate template;
@Transactional
@Scheduled(fixedDelay = 1000)
void simplePoll() {
log.info("polling for file");
IncomingFile record = null;
try {
final List records = this.template.query("""
delete from incoming_file where ir_id =
(select ir_id from incoming_file
where ir_try_count < :tryMaxCount
limit 1 for update skip locked)
returning ir_id, ir_path, ir_try_count
""",
Map.of("tryMaxCount", 3),
(rs, rowNum) -> new IncomingFile(rs.getLong("ir_id"), rs.getString("ir_path"),rs.getInt("ir_try_count"))
);
if (records.isEmpty()) {
return;
}
record = records.getFirst();
// Business logic which may throw an exception e.g. IOException
log.info("Finished processing {}", record.path());
} catch (Exception e) {
// Here the record is re-queued within the same transaction
this.template.update("""
insert into incoming_file (ir_path, ir_try_count) values
(:path, :tryCount)
""",
Map.of("path", record.path(), "tryCount", record.tryCount() + 1));
}
}
private record IncomingFile(long id, String path, int tryCount) {
}
}
Основные моменты, на которые следует обратить внимание:
Записи всегда удаляются из таблицы независимо от того, создано исключение или нет.
'Повторная постановка в очередь' записи происходит в том же потоке и, следовательно, в пределах одной транзакции.
Единственный раз, когда транзакция может откатиться, — это если процесс остановлен, и приложение не фиксирует транзакцию.
Я хотел бы смоделировать это с помощью Spring Integration с IntegrationFlows:
final JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(jdbcOperationsProvider.getObject(),
"""
delete from incoming_file where ir_id =
(select ir_id from incoming_file
where ir_try_count < :tryMaxCount
limit 1 for update skip locked)
returning ir_id, ir_path, ir_try_count
""");
adapter.setSelectSqlParameterSource(new MapSqlParameterSource("tryMaxCount", this.maxTryCount));
adapter.setRowMapper((rs, rowNum) -> new IncomingFile(
rs.getLong("ir_id"),
rs.getString("ir_path"),
rs.getInt("ir_try_count")
));
final TransactionInterceptor interceptor = new TransactionInterceptorBuilder()
.transactionAttribute(new NoRollbackTransactionAttribute())
.transactionManager(transactionManager.getObject())
.build();
final StandardIntegrationFlow flow = IntegrationFlow
.from(adapter, c -> c.poller(Pollers
.fixedRate(Duration.ofSeconds(2))
.maxMessagesPerPoll(-1)
.errorChannel(ERROR_CHANNEL)
.transactional(interceptor)
.taskExecutor(jdbcTaskExecutor)))
.transform(IncomingFile.class, incomingFile ->
new GenericMessage(incomingFile.path(),
Map.of(INCOMING_FILE_RECORD_ID, incomingFile.id(), INCOMING_FILE_RECORD_TRY_COUNT, incomingFile.tryCount())))
.handle(fileService)
.get();
final IntegrationFlowContext.IntegrationFlowRegistration register = flowContext.registration(flow).register();
this.registrationList.add(register);
@Log4j2
private static class NoRollbackTransactionAttribute extends DefaultTransactionAttribute {
@Override
public boolean rollbackOn(Throwable ex) {
final boolean isSpringException;
if (ex instanceof MessagingExceptionWrapper mexw){
isSpringException = mexw.getCause().getClass().getPackage().getName().startsWith(ORG_SPRINGFRAMEWORK);
log.warn("MessagingWrapperException. {}, Exception type: {}", mexw.getCause().getMessage(), mexw.getCause().getClass());
log.warn("Rollback: {}", isSpringException);
return isSpringException;
}
isSpringException = ex.getClass().getPackage().getName().startsWith(ORG_SPRINGFRAMEWORK);
log.warn("Exception", ex);
log.warn("Rollback: {}", isSpringException);
return isSpringException;
}
Основная проблема, с которой я сталкиваюсь, — это обработка исключений, когда они создаются в бизнес-логике, то есть в fileService, и отсутствие потери «сообщений» в случае завершения работы. Я создал класс NoRollbackTransactionAttribute, пытаясь определить причину исключения. Чтобы соответствовать примеру без интеграции Spring, исключения бизнес-логики не должны откатываться, а должны повторно помещать сообщение в очередь. В обработчике ошибок произойдет повторная постановка в очередь. Но методrollbackOn() NotRollbackTransationAttribute вызывается перед обработчиком ошибок, что затрудняет определение правильного действия отката. Также в NoRollBackTransactionAttribute мне приходилось обрабатывать различные исключения, специфичные для Spring, что не идеально. В целом этот подход работает не очень хорошо, поскольку я обнаружил, что «сообщения» могут быть потеряны во время завершения работы.
Есть ли лучший подход к моделированию исходного кода с использованием интеграции Spring?
У меня есть служба, которая опрашивает базу данных в рамках транзакции, всегда удаляя запись, а если во время обработки выдается исключение, добавляет новую запись, увеличивая «счетчик попыток» для этой записи. Это выглядит примерно так: [code]@Log4j2 @Profile("simple") @EnableScheduling @Component @RequiredArgsConstructor public class SimplePoller { @Value("${outputDir}") private final String outputDir; private final NamedParameterJdbcTemplate template;
@Transactional @Scheduled(fixedDelay = 1000) void simplePoll() { log.info("polling for file"); IncomingFile record = null; try { final List records = this.template.query(""" delete from incoming_file where ir_id = (select ir_id from incoming_file where ir_try_count < :tryMaxCount limit 1 for update skip locked) returning ir_id, ir_path, ir_try_count """, Map.of("tryMaxCount", 3), (rs, rowNum) -> new IncomingFile(rs.getLong("ir_id"), rs.getString("ir_path"),rs.getInt("ir_try_count")) ); if (records.isEmpty()) { return; } record = records.getFirst(); // Business logic which may throw an exception e.g. IOException log.info("Finished processing {}", record.path()); } catch (Exception e) { // Here the record is re-queued within the same transaction this.template.update(""" insert into incoming_file (ir_path, ir_try_count) values (:path, :tryCount) """, Map.of("path", record.path(), "tryCount", record.tryCount() + 1)); }
}
private record IncomingFile(long id, String path, int tryCount) { } } [/code] Основные моменты, на которые следует обратить внимание: [list] [*]Записи всегда удаляются из таблицы независимо от того, создано исключение или нет. [*]'Повторная постановка в очередь' записи происходит в том же потоке и, следовательно, в пределах одной транзакции. [*]Единственный раз, когда транзакция может откатиться, — это если процесс остановлен, и приложение не фиксирует транзакцию. [/list] Я хотел бы смоделировать это с помощью Spring Integration с IntegrationFlows: [code]final JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(jdbcOperationsProvider.getObject(), """ delete from incoming_file where ir_id = (select ir_id from incoming_file where ir_try_count < :tryMaxCount limit 1 for update skip locked) returning ir_id, ir_path, ir_try_count """); adapter.setSelectSqlParameterSource(new MapSqlParameterSource("tryMaxCount", this.maxTryCount)); adapter.setRowMapper((rs, rowNum) -> new IncomingFile( rs.getLong("ir_id"), rs.getString("ir_path"), rs.getInt("ir_try_count") ));
final TransactionInterceptor interceptor = new TransactionInterceptorBuilder() .transactionAttribute(new NoRollbackTransactionAttribute()) .transactionManager(transactionManager.getObject()) .build(); final StandardIntegrationFlow flow = IntegrationFlow .from(adapter, c -> c.poller(Pollers .fixedRate(Duration.ofSeconds(2)) .maxMessagesPerPoll(-1) .errorChannel(ERROR_CHANNEL) .transactional(interceptor) .taskExecutor(jdbcTaskExecutor))) .transform(IncomingFile.class, incomingFile -> new GenericMessage(incomingFile.path(), Map.of(INCOMING_FILE_RECORD_ID, incomingFile.id(), INCOMING_FILE_RECORD_TRY_COUNT, incomingFile.tryCount()))) .handle(fileService) .get(); final IntegrationFlowContext.IntegrationFlowRegistration register = flowContext.registration(flow).register(); this.registrationList.add(register);
@Log4j2 private static class NoRollbackTransactionAttribute extends DefaultTransactionAttribute { @Override public boolean rollbackOn(Throwable ex) { final boolean isSpringException;
[/code] Основная проблема, с которой я сталкиваюсь, — это обработка исключений, когда они создаются в бизнес-логике, то есть в fileService, и отсутствие потери «сообщений» в случае завершения работы. Я создал класс NoRollbackTransactionAttribute, пытаясь определить причину исключения. Чтобы соответствовать примеру без интеграции Spring, исключения бизнес-логики не должны откатываться, а должны повторно помещать сообщение в очередь. В обработчике ошибок произойдет повторная постановка в очередь. Но методrollbackOn() NotRollbackTransationAttribute вызывается перед обработчиком ошибок, что затрудняет определение правильного действия отката. Также в NoRollBackTransactionAttribute мне приходилось обрабатывать различные исключения, специфичные для Spring, что не идеально. В целом этот подход работает не очень хорошо, поскольку я обнаружил, что «сообщения» могут быть потеряны во время завершения работы. Есть ли лучший подход к моделированию исходного кода с использованием интеграции Spring?