Я новичок в реактивном программировании и хочу убедиться, что мыслю реактивно. Проблема, которую я пытаюсь решить, прямолинейна. У меня поток с относительно высокой пропускной способностью (~ 100 000 событий в секунду). При этом события фильтруются на основе кэшей динамических значений. Например:
Код: Выделить всё
Set concurrentUuidSet = ConcurrentHashMap.newKeySet();
populate(concurrentUuidSet);
Flowable.fromPublisher(dataStream)
.filter(msg -> { concurrentUuidSet.contains(msg.getUuid()) })
.subscribe(msg -> { System.out.println(msg.getData()) });
Набор относительно большой и требует динамического обновления из другого потока (хотя и не очень часто). Есть ли в rx оптимальный шаблон для чего-то подобного? Приведенный выше код хорош, но, похоже, он нарушает контракт rx. Я подумывал об использовании комбинации объекта Behavior для кэширования набора и FlatMap для сопоставления последней версии с каждым событием. Имеет ли что-то подобное смысл? Любые мысли сообщества будут оценены по достоинству. Спасибо!
Подробнее здесь:
https://stackoverflow.com/questions/793 ... -of-values