Как асинхронно запустить метод в реактивной цепочке в Spring WebFlux?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как асинхронно запустить метод в реактивной цепочке в Spring WebFlux?

Сообщение Anonymous »

Я пытаюсь выполнить метод асинхронно в существующей реактивной цепочке в моем приложении на основе Project Reactor. Метод doUpdateLayoutInAsync предназначен для выполнения тяжелой фоновой задачи, но похоже, что мой подход не работает должным образом. Вот моя текущая реализация:

Код: Выделить всё

public Mono publishPackage(String branchedPackageId) {
PackagePublishingMetaDTO publishingMetaDTO = new PackagePublishingMetaDTO();
publishingMetaDTO.setPublishEvent(true);

return packageRepository
.findById(branchedPackageId, packagePermission.getPublishPermission())
.switchIfEmpty(Mono.error(new AppsmithException(
AppsmithError.ACL_NO_RESOURCE_FOUND, FieldName.PACKAGE_ID, branchedPackageId)))
.flatMap(originalPackage -> {
String nextVersion = PackageUtils.getNextVersion(originalPackage.getVersion());

Package packageToBePublished = constructPackageToBePublished(originalPackage);

originalPackage.setVersion(nextVersion);
originalPackage.setLastPublishedAt(packageToBePublished.getLastPublishedAt());
publishingMetaDTO.setOriginPackageId(branchedPackageId);
publishingMetaDTO.setWorkspaceId(originalPackage.getWorkspaceId());

Mono unsetCurrentLatestMono = packageRepository.unsetLatestPackageByOriginId(originalPackage.getId(), null);
Mono
 saveOriginalPackage = packageRepository.save(originalPackage);
Mono savePackageToBePublished = packageRepository.save(packageToBePublished);

return unsetCurrentLatestMono
.then(Mono.zip(saveOriginalPackage, savePackageToBePublished))
.flatMap(tuple2 -> {
Package publishedPackage = tuple2.getT2();
publishingMetaDTO.setPublishedPackage(publishedPackage);

return modulePackagePublishableService
.publishEntities(publishingMetaDTO)
.flatMap(publishedModules -> {
if (publishedModules.isEmpty()) {
return Mono.error(new AppsmithException(
AppsmithError.PACKAGE_CANNOT_BE_PUBLISHED,
originalPackage.getUnpublishedPackage().getName()));
}
return moduleInstancePackagePublishableService
.publishEntities(publishingMetaDTO)
.then(Mono.defer(() ->
newActionPackagePublishableService.publishEntities(publishingMetaDTO))
.then(Mono.defer(() ->
actionCollectionPackagePublishableService
.publishEntities(publishingMetaDTO))));
})
.then(Mono.defer(() -> autoUpgradeService.handleAutoUpgrade(publishingMetaDTO)));
})
.as(transactionalOperator::transactional)
.then(Mono.defer(() -> doUpdateLayoutInAsync(publishingMetaDTO)));
});
}

private Mono doUpdateLayoutInAsync(PackagePublishingMetaDTO publishingMetaDTO) {
Mono updateLayoutsMono = Flux.fromIterable(publishingMetaDTO.getAutoUpgradedPageIds())
.flatMap(pageId -> updateLayoutService
.updatePageLayoutsByPageId(pageId)
.onErrorResume(throwable -> {
log.warn("Update layout failed for pageId: {} with error: {}", pageId, throwable.getMessage());
return Mono.just(pageId);
}))
.collectList();

// Running the updateLayoutsMono task asynchronously
updateLayoutsMono.subscribeOn(Schedulers.boundedElastic()).subscribe();

return Mono.just(Boolean.TRUE);
}
Проблема: я хочу, чтобы doUpdateLayoutInAsync работал в фоновом режиме, пока завершается остальная часть реактивной цепочки. Однако метод, похоже, выполняется синхронно, и реактивная цепочка не продолжается должным образом.
Вопрос: Как я могу гарантировать выполнение doUpdateLayoutInAsync асинхронно и не блокирует продолжение реактивной цепочки?

Подробнее здесь: https://stackoverflow.com/questions/787 ... ng-webflux
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»