- Задача 1: содержит вызовы методов блокировки, таких как загрузка данных из сегментов GCP.
- Задача 2: неблокирующее преобразование данных
- Задача 3: есть загрузка данных в базу данных Postgres с помощью Hibernate Reactive Panache.
- Q1. Каков рекомендуемый способ выполнения Task1WithBlocking в рабочем потоке. используя runSubscriptionOn или есть лучший подход, включая изменение дизайна.
- Вопрос 2: Когда Task1WithBlocking помещается в поток исполнителя, как сохранить задачу 2 и задачу 3 в исходном потоке?
- Q3: OnStart запускается в основном потоке Quarkus , это вызовет проблемы с Hibernate Reactive Panache. так как для этого нужен цикл событий vertx. Есть ли лучший способ?
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.jboss.logging.Logger;
@ApplicationScoped
public class QuarkusBlockingExample
{
private static final Logger logger = Logger
.getLogger(QuarkusBlockingExample.class);
void onStart(@Observes StartupEvent ev)
{
logger.info("Log 1: QuarkusBlockingExample onStart is starting...");
executeDataPipeline().subscribe().with(
success -> logger.info("Log 5: executeDataPipeline completed."),
failure -> logger.error("Log 5f: executeDataPipeline failed",
failure));
logger.info("Log 8: QuarkusBlockingExample onStart is finished.");
}
public Uni executeDataPipeline()
{
logger.info("Log 2: Starting executeDataPipeline");
return Uni.createFrom().voidItem().onItem()
.transformToUni(unused -> task1WithBlocking()).onItem()
.transformToUni(unused -> task2NonBlocking()).onItem()
.transformToUni(unused -> task3NonBlockingDB()).onItem()
.invoke(() -> logger
.info("Log 7: Processing initialisation complete."));
}
public Uni task1WithBlocking()
{
return Uni.createFrom().voidItem().invoke(this::blockingMethod)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.invoke(() -> logger
.info("Log 4: Completed task1WithBlocking."))
.replaceWithVoid();
}
public Uni task2NonBlocking()
{
return Uni.createFrom().voidItem().invoke(
() -> logger.info("Log 5: Completed task2NonBlocking."));
}
public Uni task3NonBlockingDB()
{
return Uni.createFrom().voidItem().invoke(() -> logger.info(
"Log 6: Completed DB load using Reactive Hibernate Panache."));
}
private void blockingMethod()
{
try
{
Thread.sleep(2000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
logger.info("Log 3: Blocking method executed.");
}
}
Подробнее здесь: https://stackoverflow.com/questions/793 ... ext-thread