RxJava 3: Как справедливо обрабатывать несколько подписок?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 RxJava 3: Как справедливо обрабатывать несколько подписок?

Сообщение Anonymous »

Мне нужно обрабатывать разные типы событий строго по одному, но в фоновом потоке.
Следующий код согласно документации следующий код Schedulers.from(исполнитель, ложь, правда); должен покрывать мои требования, но на самом деле это не так.
Код:

Код: Выделить всё

ExecutorService executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor, false, true);
PublishSubject subject1 = PublishSubject.create();
PublishSubject subject2 = PublishSubject.create();

subject1.observeOn(scheduler).subscribe(log::info);
subject2.observeOn(scheduler).subscribe(log::info);

subject1.onNext("Hello11");
subject2.onNext("Hello21");
subject1.onNext("Hello12");
subject2.onNext("Hello22");
subject1.onNext("Hello13");
subject2.onNext("Hello23");

log.info("Test activity");
Имеет следующий результат:

Код: Выделить всё

22:19:05.313 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Test activity
22:19:05.313 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello11
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello12
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello13
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello21
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello22
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello23
Это показывает, что обработка событий выполняется жадным способом, когда каждый наблюдатель отдает все события перед тем, как освободить планировщик. Что противоречит документации
.
Если .observeOn(scheduler) заменить на .subscribeOn(scheduler), результат будет следующий:

Код: Выделить всё

22:23:56.162 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello11
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello21
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello12
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello22
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello13
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello23
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Test activity
Который выполняет все события в одном потоке, что противоречит всей идее .subscribeOn.
Это ошибка или есть ли способ заставить его работать так, как ожидается в документации?
Версия: io.reactivex.rxjava3:rxjava:3.1.9

Подробнее здесь: https://stackoverflow.com/questions/792 ... air-manner
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»