У меня есть источник элементов. Его можно запрашивать, и он публикует события при добавлении элементов. Другими словами, я могу сделать «холодное» наблюдаемое на основе результатов запроса, а «горячее» — на основе события.
Один из подходов — объединить «холодное» и «горячее» наблюдение.var o = cold.Merge(hot);
Это работает, но может пропустить элементы, если они были излучены между холодными и горячими наблюдаемыми, на которые подписаны.
Итак, другой подход — слияние горячего и холодного.
var o = hot.Merge(cold);
Это тоже работает и позволяет избежать пропущенных элементов, но приведет к созданию дубликатов, если элементы были созданы из горячего наблюдаемого до того, как на него была подписана холодная наблюдаемая.
А как насчет фильтрации нечетких элементов?
var o = hot.Merge(cold).Distinct()
Это решает проблему, но при этом будет поддерживаться растущий список элементов для проверки на различимость. Мне интересно найти способ добиться такого же поведения, как при использовании Distinct, но без использования все большего и большего объема памяти с течением времени.
Я считаю, что решение выглядит примерно так:Подпишитесь на тему в горячем обозримом. Подпишите холодную наблюдаемую наблюдателю. Приостановите предмет (?), затем воспроизведите предмет наблюдателю, но с проверкой на различимость. Наконец, снимите паузу (или как это сработает) субъекта и подпишите его на наблюдателя (без проверки различимости).
Ниже приведен пример кода, демонстрирующий описанное выше поведение. Вы можете запустить его здесь, если у вас нет компилятора.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;
var strategies = new[]
{
WarmObservable.WithMissesFrom,
WarmObservable.WithDuplicatesFrom,
WarmObservable.WithCorrectButHighMemoryUsageFrom,
//WarmObservable.From
};
foreach (var strategy in strategies)
{
var observed = new List();
var nums = new ObservableAddOnlyCollection();
var warm = strategy(nums, nums.Added);
const Int32 max = 1_000_000;
Parallel.For(0, max, i =>
{
nums.Add(i);
if (i == max / 2)
warm.Subscribe(observed.Add);
});
Console.WriteLine($"{observed.Count:N0} elements observed using {strategy.Method.Name}.");
}
public sealed class ObservableAddOnlyCollection : IObservable
{
private readonly List items = new();
private event Action added = _ => {};
public IObservable Added => Observable.FromEvent(h => added += h, h => added -= h);
public void Add(T item) { lock (items) items.Add(item); added.Invoke(item); }
public IDisposable Subscribe(IObserver observer) => ToList().ToObservable().Subscribe(observer);
private List ToList() { lock (items) return items.ToList(); }
}
public static class WarmObservable
{
public static IObservable WithMissesFrom(IObservable cold, IObservable hot)
{
return cold.Merge(hot);
}
public static IObservable WithDuplicatesFrom(IObservable cold, IObservable hot)
{
return hot.Merge(cold);
}
public static IObservable WithCorrectButHighMemoryUsageFrom(IObservable cold, IObservable hot)
{
return hot.Merge(cold).Distinct();
}
public static IObservable From(IObservable cold, IObservable hot)
{
// merge a cold observable with a hot observable, without losing any elements and without duplicating any elements
// also don't require an increasing memory footprint to distinguish duplicates
throw new NotImplementedException();
}
}
Подробнее здесь: https://stackoverflow.com/questions/790 ... ng-duplica
Как объединить горячие и холодные наблюдаемые одного и того же источника, эффективно избегая дублирования? ⇐ C#
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как объединить панды DataFrames с многими ко многим отношениям, избегая столкновений?
Anonymous » » в форуме Python - 0 Ответы
- 17 Просмотры
-
Последнее сообщение Anonymous
-