Rxjava может быть. Dofinally () выполнение гораздо позже, чем doonsuccess () в высокопоставленной многопоточной средеJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Rxjava может быть. Dofinally () выполнение гораздо позже, чем doonsuccess () в высокопоставленной многопоточной среде

Сообщение Anonymous »

Я испытываю запутанную проблему срока с помощью времени Rxjava, возможно, операторов в высокопроизводительной службе (40 TPS) с одновременным исполнением. У меня есть два класса, которые время выполнения журнала, но они показывают очень разные времена для того, что должно быть одной и той же операцией.

Код: Выделить всё

Class A (the actual implementation):

Код: Выделить всё

public Maybe execute(Input input) {
long start = System.nanoTime();

return Maybe.fromCallable(() -> {
// Some processing that takes ~500ms
return callExternalService();
})
.doOnSuccess(decision -> {
log.info("Implementation took {} ms",
((System.nanoTime() - start) / 1000000)); // Logs ~500ms
});
}

< /code>
Class B (a decorator that wraps Class A):

Код: Выделить всё

public Maybe execute(Input input) {
return Maybe.defer(() -> {
long startTime = System.nanoTime();

return delegate.execute(input)  // Calls Class A
.doOnSuccess(decision -> {
// Other success logic
})
.doFinally(() -> {
log.info("Strategy took {} ms",
((System.nanoTime() - startTime) / 1000000)); // Logs ~1300ms
});
});
}
< /code>
[b]The Timing Issue[/b]
Class A's doOnSuccess
: журналы при ~ 500 мс ✓ (ожидается)
класс B Dofinally : журналы при ~ 1300 мс ❌ (на 800 мс позже, чем ожидалось)
, что в декораторе не будет выполнено, чем в декораторе. измерение той же операции. Журнал из Dofinally печатается где-то между ними во время нижней обработки (не в конце).
контекст
Все классы «а»-это стратегии. Стратегии выполняются исполнителем. Все стратегии высокого приоритета выполняются сначала параллельно, за которым следует средний, а затем низкие приоритетные. Класс А был стратегией с низким приоритетом, выполненной в конце. < /P>

Код: Выделить всё

@Singleton
public class StrategyExecutor {
private final Executor executor; // ThreadContextAwareExecutor with cached thread pool

public Maybe strategyExecute(Request request, /* other params */) {
return Maybe.defer(() -> {
return Observable.fromArray(HIGH, MEDIUM, LOW)
.concatMapMaybe(priority -> {
List strategies = strategyMap.get(priority);

// Convert each strategy to Maybe and run concurrently
List maybesFromStrategies = strategies.stream()
.map(strategy -> strategy.execute(input)
.subscribeOn(Schedulers.from(executor))
.onErrorComplete() // Graceful error handling
.filter(Decision::isValid))
.collect(Collectors.toList());

return Maybe.merge(maybesFromStrategies)
.firstElement();
})
.firstElement();
});
}
}

< /code>

[*]  среда высокой пропускной способности: < /p>
< /li>
  Запуск 40 TPS (транзакции в секунду) < /p>
< /li>
  Запрос на каждый раз, обрабатываемые < /p>
< /li>   . Кэшированные пулы потоков, с исполнителем, показанным ниже. < /p>
< /li>
< /ol>
Executor Used - Executors.newCachedThreadPool()

Результат исполнителя стратегии потребляется классом высшего уровня, который вызывает услуги вниз по течению. Службы вниз по течению - это очень скрытые услуги < /p>

Код: Выделить всё

public Observable processRequest(Request request) {
return strategyExecutor.strategyExecute(request, /* params */)  // Returns Maybe
.toObservable()
.flatMap(decision -> {
Service selectedService = serviceMap.get(decision.serviceName());
return selectedService.process(request);
});
}
< /code>
[b]Threading Concerns[/b]
Since we're running at high throughput with concurrent requests:

Could thread scheduling delays cause doFinally to execute much later?

[*]Is doFinally
В ожидании какой -то части нисходящей цепочки для завершения перед выполнением? (Это не происходит для стратегий с высоким/средним приоритетом)

в многопоточной среде отличается ли поведение дофинально от однопоточного исполнения? Может быть. Merge () с FirstElement (), вызывая дофинально ждать всей цепочки нижней части? TPS будет влиять на то, что дофинально выполняет? Время (не вниз по течению обработки) в этом сценарии одновременного слияния? Если я использую TimeInterval () в исполнителе стратегии и использую DOONSUCCESS для журнала, он работает, как и ожидалось.>

Подробнее здесь: https://stackoverflow.com/questions/797 ... gh-through
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»