public Mono process(MyClass myclass) {
return doSomething(myclass)
.flatMap(result -> doSomethingElse(result, myclass))
.onErrorResume(throwable -> {
myclass.setErrors(toErrors(throwable)); // errors field does not have volatile modifier
return someFallback;
})
.flatMap(result -> postprocess(result, myclass));
}
Хотя все операции в конвейере выполняются последовательно, каждый этап потенциально может выполняться в отдельном потоке (например, может быть больше этапов, обеспечивающих переходы между потоками). Это вызывает беспокойство по поводу видимости памяти: мы можем изменить объект в одном потоке и прочитать его из другого. Без надлежащей синхронизации (например, изменяемых полей или синхронизированных блоков) между этими операциями не существует гарантии «случится до того».
Насколько я понимаю, несмотря на последовательные операции в конвейере, существует потенциальный риск: изменения, внесенные в объект в одном потоке, могут быть не сразу видны другому потоку из-за отсутствия надлежащих барьеров памяти или механизмов синхронизации. Это может привести к проблемам, которые трудно обнаружить и воспроизвести.
Однако мне не удалось воспроизвести проблему. Это может быть связано с современными оптимизациями JVM и механизмами согласованности кэша (проверено на Apple Silicon и процессоре Intel) или с тем, что я ошибаюсь в своем понимании.
Вопросы:
- Правильно ли я понимаю потенциальные проблемы видимости в реактивных конвейерах? И если да:
- Может ли кто-нибудь предоставить рабочий пример, демонстрирующий эту проблему видимости с энергонезависимыми полями (со строго последовательными этапами)?
Пояснения о том, как этапы могут оказаться в разных потоках (расширенный пример):
Scheduler remoteCallsScheduler = Schedulers.newParallel("remote-calls", 1);
Scheduler dbCallsScheduler = Schedulers.newParallel("db-calls", 1);
Scheduler localProcessingScheduler = Schedulers.newParallel("local-processing", 1);
@Data
class MyClass {
String errors;
String someField;
}
String toErrors(Throwable s) {
return s.toString();
}
Mono executeOnScheduler(Mono mono, Scheduler scheduler) {
return Mono.from(mono).publishOn(scheduler).subscribeOn(scheduler);
}
Mono doSomething(MyClass myClass) {
final var mono = Mono.just(myClass.someField)
.doOnNext(__ -> {
System.out.println("doSomething is executed on " + Thread.currentThread().getName());
});
return executeOnScheduler(mono, remoteCallsScheduler);
}
Mono doSomethingElse(String s,MyClass myClass) {
final var mono = Mono.just(s)
.doOnNext(__ -> {
System.out.println("doSomethingElse is executed on " + Thread.currentThread().getName());
throw new RuntimeException("doSomethingElse throws error ");
});
return executeOnScheduler(mono, dbCallsScheduler);
}
Mono postprocess(String s,MyClass myClass) {
final var mono = Mono.just(myClass)
.doOnNext(__ -> {
System.out.println("postprocess is executed on " + Thread.currentThread().getName());
});
return executeOnScheduler(mono, localProcessingScheduler);
}
Mono someFallback = Mono.just("some fallback");
public Mono process(MyClass myclass) {
return doSomething(myclass)
.flatMap(result -> doSomethingElse(result, myclass))
.onErrorResume(throwable -> {
System.out.println("onErrorResume is executed on " + Thread.currentThread().getName());
myclass.setErrors(toErrors(throwable)); // errors field does not have volatile modifier
return someFallback;
})
.flatMap(result -> postprocess(result, myclass));
}
@Test
void test() {
final var myClass = new MyClass();
myClass.someField = "some field";
final var result = process(myClass);
result.block();
}
Тогда результат выполнения теста будет следующим:
doSomething is executed on remote-calls-1
doSomethingElse is executed on db-calls-2
onErrorResume is executed on db-calls-2
postprocess is executed on local-processing-3
Подробнее здесь: https://stackoverflow.com/questions/793 ... -pipelines