Как динамически распределить потоки по нескольким службам, используя Executorservice? [закрыто]JAVA

Программисты JAVA общаются здесь
Anonymous
Как динамически распределить потоки по нескольким службам, используя Executorservice? [закрыто]

Сообщение Anonymous »

У меня есть четыре службы, которые публикуют четыре типа транзакций на удаленный сервер, который допускает только 14 одновременных запросов одновременно. Сервис C и служба D, с другой стороны, требует как минимум по 1 потоке каждый для обработки своих транзакций. < /P>
Правила:
Каждая услуга должна получать его минимальные необходимые потоки < /strong> Когда все услуги активны. работает, в то время как другие неактивны, он должен иметь возможность использовать все 14 потоков. Но когда сервис B начинает работать, сервис ARD LEADS LEADS, обеспечивая, чтобы обе службы получали минимальное распределение (A → 6, B → 6), в то время как оставшиеся 2 потока доступны для использования A, B, если это необходимо. < /P>
Не существует строгого правила о том, как распространяются дополнительные потоки - что имеет значение. нить Но если запускается служба B, то B получает 6 потоков, D получает 1, а оставшиеся 7 потоков динамически доступны для других активных служб. Сервер.
Service C & D Запуск по требованию, запускаемые пользовательскими запросами и генерируют различные типы транзакций. < /p>
Есть идеи, как это реализовать? Вот моя первая попытка: < /p>

Код: Выделить всё

public class TaskExecutorManagerImpl implements TaskExecutorManager {
private final ExecutorService executorService;
private final Map activeServices = new ConcurrentHashMap();
private final Map serviceSemaphores = new ConcurrentHashMap();
public TaskExecutorManagerImpl() {
executorService = new ThreadPoolExecutor(
0, ServiceType.getCount(),
30, TimeUnit.SECONDS,
new SynchronousQueue(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (ServiceType serviceType: ServiceType.values()) {
serviceSemaphores.put(serviceType, new Semaphore(serviceType.getMinThreads()));
activeServices.put(serviceType, false);
}
}
@Override
public void activateService(ServiceType serviceType) {
activeServices.replace(serviceType, true);
}

@Override
public void inactivateService(ServiceType serviceType) {
activeServices.replace(serviceType, false);
}

@Override
public void exec(ServiceType serviceType, Runnable task) {
if (!activeServices.get(serviceType))
throw new RuntimeException(serviceType + " is inactive. Please activate it first.");

Semaphore servicePermits = acquirePermit(serviceType);

executorService.execute(() -> {
try {
task.run();
} finally {
servicePermits.release();
}
});
}

@Override
public  Future submit(ServiceType serviceType, Callable callable) {
if (!activeServices.get(serviceType))
throw new RuntimeException(serviceType + " is inactive. Please activate it first.");

Semaphore servicePermits = acquirePermit(serviceType);
return executorService.submit(() -> {
try {
return callable.call();
} finally {
servicePermits.release();
}
});
}

private Semaphore acquirePermit(ServiceType serviceType) {
Semaphore servicePermits = serviceSemaphores.get(serviceType);
while (true) {
if (servicePermits.tryAcquire()) {
return servicePermits;
} else {
Optional inactiveService = activeServices.entrySet().stream()
.filter(entry -> !entry.getValue())
.map(Map.Entry::getKey)
.filter(s -> serviceSemaphores.get(s).availablePermits() > 0)
.findFirst();
if (inactiveService.isPresent()) {
Semaphore semaphore = serviceSemaphores.get(inactiveService.get());
if (semaphore.tryAcquire())
return semaphore;
}
}
}
}
}
Я протестировал его, но это глюка, потому что он либо выполняет 14 задач или более, а затем блокирует

Подробнее здесь: https://stackoverflow.com/questions/794 ... cutorservi

Вернуться в «JAVA»