Цель состоит в том, чтобы выполнить 3 задачи в последовательности: рассматривать их как одноразовое, когда приложение начинается - например, очистка данных, преобразование и нагрузка. Ниже приведен пример игрушки для демонстрации: < /p>
Задача 1: имеет вызовы к методам блокировки, такими как загрузка данных из ведер GCP. < /P>
< /li>
Задача 2: не блокирующее преобразование данных < /p>
< /li>
Задача 3: Загрузка данных в Postgres DB с использованием Hibernate Reactive Panach. рабочая ветка. Использование RunSubScriptionOn или есть лучший подход, включая изменение дизайна. Как сохранить задачу 2 и задачу 3 в исходном потоке? , это приведет к проблемам Hibernate Reactive Panache, поскольку он нуждается в петле событий Vertx. Есть ли лучший способ? < /P>
< /li>
< /ul>
Пожалуйста, посоветуйте, есть ли лучший способ сделать это, учитывая упор. (Попытка перейти к реактивному подходу, но на некоторое время застряла по этому вопросу. Открыть для всех предложений.) < /P>
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.jboss.logging.Logger;
@ApplicationScoped
public class QuarkusBlockingExample
{
private static final Logger logger = Logger
.getLogger(QuarkusBlockingExample.class);
void onStart(@Observes StartupEvent ev)
{
logger.info("Log 1: QuarkusBlockingExample onStart is starting...");
executeDataPipeline().subscribe().with(
success -> logger.info("Log 5: executeDataPipeline completed."),
failure -> logger.error("Log 5f: executeDataPipeline failed",
failure));
logger.info("Log 8: QuarkusBlockingExample onStart is finished.");
}
public Uni executeDataPipeline()
{
logger.info("Log 2: Starting executeDataPipeline");
return Uni.createFrom().voidItem().onItem()
.transformToUni(unused -> task1WithBlocking()).onItem()
.transformToUni(unused -> task2NonBlocking()).onItem()
.transformToUni(unused -> task3NonBlockingDB()).onItem()
.invoke(() -> logger
.info("Log 7: Processing initialisation complete."));
}
public Uni task1WithBlocking()
{
return Uni.createFrom().voidItem().invoke(this::blockingMethod)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.invoke(() -> logger
.info("Log 4: Completed task1WithBlocking."))
.replaceWithVoid();
}
public Uni task2NonBlocking()
{
return Uni.createFrom().voidItem().invoke(
() -> logger.info("Log 5: Completed task2NonBlocking."));
}
public Uni task3NonBlockingDB()
{
return Uni.createFrom().voidItem().invoke(() -> logger.info(
"Log 6: Completed DB load using Reactive Hibernate Panache."));
}
private void blockingMethod()
{
try
{
Thread.sleep(2000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
logger.info("Log 3: Blocking method executed.");
}
}
Подробнее здесь: https://stackoverflow.com/questions/793 ... ext-thread
Quarkus with Mutiny: поиск советов по запуску блокировки кода и контекста/управления потоками? ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Помощь в проекте сжатия данных, поиск советов/предложений о том, как двигаться дальше. Джава
Anonymous » » в форуме JAVA - 0 Ответы
- 21 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Поиск советов по эффективным операциям Pandas для условного суммирования
Anonymous » » в форуме Python - 0 Ответы
- 2 Просмотры
-
Последнее сообщение Anonymous
-