Код: Выделить всё
fun main(args: Array) {
val start = System.currentTimeMillis()
Internal().doWork()
println("Duration is ${(System.currentTimeMillis() - start)/1000} sec")
}
class Internal {
fun doWork() {
val pool = ThreadPoolExecutor(
3, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
ArrayBlockingQueue(1000),
)
val future = CompletableFuture.supplyAsync(
{
// 1 subtask
val future1 = CompletableFuture.supplyAsync(
{
(1..10).map {
CompletableFuture.supplyAsync(SingleExternalCall(), pool)
}.sumOf { it.join() }
},
pool,
)
// 2 subtask
val future2 = CompletableFuture.supplyAsync(
{
(1..5).map {
CompletableFuture.supplyAsync(SingleExternalCall(), pool)
}.sumOf { it.join() }
},
pool,
)
// aggregate
future1.join() + future2.join()
},
pool,
)
println(future.join())
}
class SingleExternalCall : Supplier {
override fun get(): Int {
Thread.sleep(5000)
return counter.incrementAndGet().toInt()
}
}
companion object {
private val counter = AtomicLong()
}
}
< /code>
Если вы попытаетесь запустить его - приложение будет повесить. Первоначально 3 потока будут созданы
[list]
[*] Первый поток будет ждать первого завершения подзадачи .sumof {it.join ()}
Код: Выделить всё
sumOf { it.join() }
[/list]
Все остальные задачи будут в очереди, но поскольку очередь задач довольно длинная (гораздо длиннее, чем количество TAK в моих примерах), чтобы новые потоки не были созданы. Таким образом, Appliciton не имеет ресурсов для выполнения полезной работы. < /P>
Если мы предоставим как минимум 4 потока в виде основного размера пула - все работы будут выполнены < /p>
Давайте подумаем о решениях: < /p>
- Мы можем дать больше потоков. Но сколько? В один момент мы будем лимитированы по количеству потока
- Мы можем попытаться использовать executors.newcachedthreadpool () . Он создаст новый поток каждый раз, когда нам понадобится дополнительный поток (если внутри нет бесплатного потока)
- Мы можем попытаться использовать executors.newworkstealingpool ()
< /ol>
Концептуально выглядит как Forkjoinpool (который используется в NewWorkStealingPool < /code>) является лучшим, потому что у нас зависимость между задачами.
Но я Заведите несколько статей о Forkjoinpool и увидите, что есть какой -то конкретный API с явным виком /и выглядит так, как будто потребуется переписать все приложение (я хотел бы избежать его, если это возможно)Код: Выделить всё
join
Поэтому я хочу знать:
Это хорошая идея использовать forkjoinpool /в моем примере?Код: Выделить всё
Executors.newCachedThreadPool()
- Если да - какой API лучший?
Подробнее здесь: https://stackoverflow.com/questions/793 ... -better-pe