Код: Выделить всё
Class A (the actual implementation):Код: Выделить всё
public Maybe execute(Input input) {
long start = System.nanoTime();
return Maybe.fromCallable(() -> {
// Some processing that takes ~500ms
return callExternalService();
})
.doOnSuccess(decision -> {
log.info("Implementation took {} ms",
((System.nanoTime() - start) / 1000000)); // Logs ~500ms
});
}
< /code>
Class B (a decorator that wraps Class A):Код: Выделить всё
public Maybe execute(Input input) {
return Maybe.defer(() -> {
long startTime = System.nanoTime();
return delegate.execute(input) // Calls Class A
.doOnSuccess(decision -> {
// Other success logic
})
.doFinally(() -> {
log.info("Strategy took {} ms",
((System.nanoTime() - startTime) / 1000000)); // Logs ~1300ms
});
});
}
< /code>
[b]The Timing Issue[/b]
Class A's doOnSuccessкласс B Dofinally : журналы при ~ 1300 мс
, что в декораторе не будет выполнено, чем в декораторе. измерение той же операции. Журнал из Dofinally печатается где-то между ними во время нижней обработки (не в конце).
контекст
Все классы «а»-это стратегии. Стратегии выполняются исполнителем. Все стратегии высокого приоритета выполняются сначала параллельно, за которым следует средний, а затем низкие приоритетные. Класс А был стратегией с низким приоритетом, выполненной в конце. < /P>
Код: Выделить всё
@Singleton
public class StrategyExecutor {
private final Executor executor; // ThreadContextAwareExecutor with cached thread pool
public Maybe strategyExecute(Request request, /* other params */) {
return Maybe.defer(() -> {
return Observable.fromArray(HIGH, MEDIUM, LOW)
.concatMapMaybe(priority -> {
List strategies = strategyMap.get(priority);
// Convert each strategy to Maybe and run concurrently
List maybesFromStrategies = strategies.stream()
.map(strategy -> strategy.execute(input)
.subscribeOn(Schedulers.from(executor))
.onErrorComplete() // Graceful error handling
.filter(Decision::isValid))
.collect(Collectors.toList());
return Maybe.merge(maybesFromStrategies)
.firstElement();
})
.firstElement();
});
}
}
< /code>
[*] среда высокой пропускной способности: < /p>
< /li>
Запуск 40 TPS (транзакции в секунду) < /p>
< /li>
Запрос на каждый раз, обрабатываемые < /p>
< /li> . Кэшированные пулы потоков, с исполнителем, показанным ниже. < /p>
< /li>
< /ol>
Executor Used - Executors.newCachedThreadPool()Результат исполнителя стратегии потребляется классом высшего уровня, который вызывает услуги вниз по течению. Службы вниз по течению - это очень скрытые услуги < /p>
Код: Выделить всё
public Observable processRequest(Request request) {
return strategyExecutor.strategyExecute(request, /* params */) // Returns Maybe
.toObservable()
.flatMap(decision -> {
Service selectedService = serviceMap.get(decision.serviceName());
return selectedService.process(request);
});
}
< /code>
[b]Threading Concerns[/b]
Since we're running at high throughput with concurrent requests:
Could thread scheduling delays cause doFinally to execute much later?
[*]Is doFinallyв многопоточной среде отличается ли поведение дофинально от однопоточного исполнения? Может быть. Merge () с FirstElement (), вызывая дофинально ждать всей цепочки нижней части? TPS будет влиять на то, что дофинально выполняет? Время (не вниз по течению обработки) в этом сценарии одновременного слияния? Если я использую TimeInterval () в исполнителе стратегии и использую DOONSUCCESS для журнала, он работает, как и ожидалось.>
Подробнее здесь: https://stackoverflow.com/questions/797 ... gh-through
Мобильная версия