Как преобразовать службу опроса базы данных в Spring IntegrationFlowJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как преобразовать службу опроса базы данных в Spring IntegrationFlow

Сообщение Anonymous »

У меня есть служба, которая опрашивает базу данных в рамках транзакции, всегда удаляя запись, а если во время обработки выдается исключение, добавляет новую запись, увеличивая «счетчик попыток» для этой записи. Это выглядит примерно так:

Код: Выделить всё

@Log4j2
@Profile("simple")
@EnableScheduling
@Component
@RequiredArgsConstructor
public class SimplePoller {
@Value("${outputDir}")
private final String outputDir;
private final NamedParameterJdbcTemplate template;

@Transactional
@Scheduled(fixedDelay = 1000)
void simplePoll() {
log.info("polling for file");
IncomingFile record = null;
try {
final List records = this.template.query("""
delete from incoming_file where ir_id =
(select ir_id from incoming_file
where ir_try_count < :tryMaxCount
limit 1 for update skip locked)
returning ir_id, ir_path, ir_try_count
""",
Map.of("tryMaxCount", 3),
(rs, rowNum) -> new IncomingFile(rs.getLong("ir_id"), rs.getString("ir_path"),rs.getInt("ir_try_count"))
);
if (records.isEmpty()) {
return;
}
record = records.getFirst();
// Business logic which may throw an exception e.g.  IOException
log.info("Finished processing {}", record.path());
} catch (Exception e) {
// Here the record is re-queued within the same transaction
this.template.update("""
insert into incoming_file (ir_path, ir_try_count) values
(:path, :tryCount)
""",
Map.of("path", record.path(), "tryCount", record.tryCount() + 1));
}

}

private record IncomingFile(long id, String path, int tryCount) {
}
}
Основные моменты, на которые следует обратить внимание:
  • Записи всегда удаляются из таблицы независимо от того, создано исключение или нет.
  • 'Повторная постановка в очередь' записи происходит в том же потоке и, следовательно, в пределах одной транзакции.
  • Единственный раз, когда транзакция может откатиться, — это если процесс остановлен, и приложение не фиксирует транзакцию.
Я хотел бы смоделировать это с помощью Spring Integration с IntegrationFlows:

Код: Выделить всё

final JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(jdbcOperationsProvider.getObject(),
"""
delete from incoming_file where ir_id =
(select ir_id from incoming_file
where ir_try_count < :tryMaxCount
limit 1 for update skip locked)
returning ir_id, ir_path, ir_try_count
""");
adapter.setSelectSqlParameterSource(new MapSqlParameterSource("tryMaxCount", this.maxTryCount));
adapter.setRowMapper((rs, rowNum) -> new IncomingFile(
rs.getLong("ir_id"),
rs.getString("ir_path"),
rs.getInt("ir_try_count")
));

final TransactionInterceptor interceptor = new TransactionInterceptorBuilder()
.transactionAttribute(new NoRollbackTransactionAttribute())
.transactionManager(transactionManager.getObject())
.build();
final StandardIntegrationFlow flow = IntegrationFlow
.from(adapter, c -> c.poller(Pollers
.fixedRate(Duration.ofSeconds(2))
.maxMessagesPerPoll(-1)
.errorChannel(ERROR_CHANNEL)
.transactional(interceptor)
.taskExecutor(jdbcTaskExecutor)))
.transform(IncomingFile.class, incomingFile ->
new GenericMessage(incomingFile.path(),
Map.of(INCOMING_FILE_RECORD_ID, incomingFile.id(), INCOMING_FILE_RECORD_TRY_COUNT, incomingFile.tryCount())))
.handle(fileService)
.get();
final IntegrationFlowContext.IntegrationFlowRegistration register = flowContext.registration(flow).register();
this.registrationList.add(register);

@Log4j2
private static class NoRollbackTransactionAttribute extends DefaultTransactionAttribute {
@Override
public boolean rollbackOn(Throwable ex) {
final boolean isSpringException;

if (ex instanceof MessagingExceptionWrapper mexw){
isSpringException = mexw.getCause().getClass().getPackage().getName().startsWith(ORG_SPRINGFRAMEWORK);
log.warn("MessagingWrapperException. {}, Exception type: {}", mexw.getCause().getMessage(), mexw.getCause().getClass());
log.warn("Rollback: {}", isSpringException);
return isSpringException;
}
isSpringException =  ex.getClass().getPackage().getName().startsWith(ORG_SPRINGFRAMEWORK);
log.warn("Exception", ex);
log.warn("Rollback: {}", isSpringException);
return isSpringException;
}

Основная проблема, с которой я сталкиваюсь, — это обработка исключений, когда они создаются в бизнес-логике, то есть в fileService, и отсутствие потери «сообщений» в случае завершения работы. Я создал класс NoRollbackTransactionAttribute, пытаясь определить причину исключения. Чтобы соответствовать примеру без интеграции Spring, исключения бизнес-логики не должны откатываться, а должны повторно помещать сообщение в очередь. В обработчике ошибок произойдет повторная постановка в очередь. Но методrollbackOn() NotRollbackTransationAttribute вызывается перед обработчиком ошибок, что затрудняет определение правильного действия отката. Также в NoRollBackTransactionAttribute мне приходилось обрабатывать различные исключения, специфичные для Spring, что не идеально. В целом этот подход работает не очень хорошо, поскольку я обнаружил, что «сообщения» могут быть потеряны во время завершения работы.
Есть ли лучший подход к моделированию исходного кода с использованием интеграции Spring?

Подробнее здесь: https://stackoverflow.com/questions/798 ... rationflow
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»