Проблема, которую я пытаюсь решить, — агрегирование последовательности (суммирующих значений) сообщений типа (int Key, int Value) до тех пор, пока закрывающая наблюдаемая не выдаст элемент маркера "flush".
Например, если задана последовательность элементов (Key,Value)
Код: Выделить всё
(1,1) - (2,3) - (1,3) - (Flush) - (2,1) - (2,5) - (Complete)Когда последовательность завершена, должен быть создан массив с [(2,6)].
Что я делаю попробовал
Я начал с GroupBy + Aggregate + Buffer(flush) как интуитивно понятный способ достижения требуемого поведения. Однако последовательность выдает не промежуточные результаты, а все агрегаты в качестве окончательного результата.
Вот полный код
Код: Выделить всё
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
var source = new Subject();
var flush = new Subject();
var completion = StartProcessing(source, flush);
source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));
flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected
source.OnNext((2, 1));
source.OnNext((2, 5));
source.OnCompleted(); // emit of [(2,6)] is expected
await completion;
return;
static Task StartProcessing(
IObservable source,
IObservable flush)
{
return source
.GroupBy
(
keySelector: message => message.Key
)
.SelectMany
(
group => group
.Aggregate
(
seed: (group.Key, Value: 0),
accumulator: (output, item) => (output.Key, output.Value + item.Value)
)
)
.Buffer(flush)
.Select(buffer => Observable.FromAsync(() => Flush(buffer)))
.Merge()
.ToTask();
}
static async Task Flush(IEnumerable data)
{
Console.WriteLine($"Flushing [{string.Join(", ", data)}]");
// processing takes time
await Task.Delay(TimeSpan.FromSeconds(1));
}
Код: Выделить всё
Flushing []
Flushing [(1, 4), (2, 9)]
Я также пытался использовать GroupByUntil вместо GroupBy и Buffer. С помощью этой структуры я получаю промежуточные выходные данные при закрытии группы, но агрегаты каждой группы выдаются один за другим, и неясно, как их объединить, чтобы они не сбрасывались отдельно.
Код: Выделить всё
Flushing (1, 4)
Flushing (2, 3)
Flushing (2, 6)
- что считается правильным - с точки зрения RX - способ реализации требуемого поведения?
- можно ли этого добиться с помощью встроенных операторов или мне следует создать собственные (например, собственный AggregateUntil)?
Это альтернативное решение, похоже, работает
Код: Выделить всё
static Task StartProcessing(
IObservable source,
IObservable flush)
{
return source
.GroupByUntil(_ => true, _ => flush)
.SelectMany(group => group
.Aggregate
(
seed: new Dictionary(),
accumulator: (output, item) =>
{
output[item.Key] = output.TryGetValue(item.Key, out var value)
? value + item.Value
: item.Value;
return output;
}
)
.SelectMany(buffer => Observable
.FromAsync(() => Flush(buffer.Select(x => (x.Key, x.Value)))))
)
.ToTask();
}
- с помощью одного ключа для создания последовательности, которая может быть завершена по требованию путем запуска наблюдаемого «сброса». Это похоже на TakeUntil, но может создавать несколько наблюдаемых вместо одного
Код: Выделить всё
GroupByUntil - для выполнения накопления. После завершения группы, вызванного «сбросом», будет создан словарь с накоплениями.
Код: Выделить всё
Aggergate - создает отложенное действие, которое начнется, когда появится наблюдатель, поэтому он получит буфер с накоплениями и начнет обработку. FromAsync возвращает IObservable и выдает значение после завершения задачи
Код: Выделить всё
Observable.FromAsync - используется для объединения последовательности наблюдаемых величин в одну наблюдаемую последовательность. Он заменяет Select + Merge
Код: Выделить всё
SelectMany
Finally ToTask возвращает задачу, которая завершится, когда последний раз будет получен модуль, указывающий, что все задачи обработки завершены
Подробнее здесь: https://stackoverflow.com/questions/783 ... losing-seq
Мобильная версия