Rx.Net – Как агрегировать сообщения и выдавать промежуточные выходные данные при запуске последовательности закрытия?C#

Место общения программистов C#
Ответить
Anonymous
 Rx.Net – Как агрегировать сообщения и выдавать промежуточные выходные данные при запуске последовательности закрытия?

Сообщение Anonymous »

Цель
Проблема, которую я пытаюсь решить, — агрегирование последовательности (суммирующих значений) сообщений типа (int Key, int Value) до тех пор, пока закрывающая наблюдаемая не выдаст элемент маркера "flush".
Например, если задана последовательность элементов (Key,Value)

Код: Выделить всё

(1,1) - (2,3) - (1,3) - (Flush) - (2,1) - (2,5) - (Complete)
Когда срабатывает Flush, я ожидаю, что будет создан массив с [(1,4), (2,3)].< /p>
Когда последовательность завершена, должен быть создан массив с [(2,6)].
Что я делаю попробовал
Я начал с GroupBy + Aggregate + Buffer(flush) как интуитивно понятный способ достижения требуемого поведения. Однако последовательность выдает не промежуточные результаты, а все агрегаты в качестве окончательного результата.
Вот полный код

Код: Выделить всё

using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

var source = new Subject();
var flush = new Subject();

var completion = StartProcessing(source, flush);

source.OnNext((1, 1));
source.OnNext((2, 3));
source.OnNext((1, 3));

flush.OnNext(Unit.Default); // emit of [(1,4), (2,3)] is expected

source.OnNext((2, 1));
source.OnNext((2, 5));

source.OnCompleted(); // emit of [(2,6)] is expected

await completion;

return;

static Task StartProcessing(
IObservable source,
IObservable flush)
{
return source
.GroupBy
(
keySelector: message => message.Key
)
.SelectMany
(
group => group
.Aggregate
(
seed: (group.Key, Value: 0),
accumulator: (output, item) => (output.Key, output.Value + item.Value)
)
)
.Buffer(flush)
.Select(buffer => Observable.FromAsync(() => Flush(buffer)))
.Merge()
.ToTask();
}

static async Task Flush(IEnumerable data)
{
Console.WriteLine($"Flushing [{string.Join(", ", data)}]");

// processing takes time
await Task.Delay(TimeSpan.FromSeconds(1));
}
Вывод:

Код: Выделить всё

Flushing []
Flushing [(1, 4), (2, 9)]
Насколько я понимаю (хотя и не уверен), Buffer не выдает значения, пока последовательность не завершена, поскольку Aggregate не распространяет промежуточные значения.< /p>
Я также пытался использовать GroupByUntil вместо GroupBy и Buffer. С помощью этой структуры я получаю промежуточные выходные данные при закрытии группы, но агрегаты каждой группы выдаются один за другим, и неясно, как их объединить, чтобы они не сбрасывались отдельно.

Код: Выделить всё

Flushing (1, 4)
Flushing (2, 3)
Flushing (2, 6)
Пока я застрял и искал помощи, а именно:
  • что считается правильным - с точки зрения RX - способ реализации требуемого поведения?
  • можно ли этого добиться с помощью встроенных операторов или мне следует создать собственные (например, собственный AggregateUntil)?
Обновить
Это альтернативное решение, похоже, работает

Код: Выделить всё

static Task StartProcessing(
IObservable source,
IObservable flush)
{
return source
.GroupByUntil(_ => true, _ => flush)
.SelectMany(group => group
.Aggregate
(
seed: new Dictionary(),
accumulator: (output, item) =>
{
output[item.Key] = output.TryGetValue(item.Key, out var value)
? value + item.Value
: item.Value;

return output;
}
)
.SelectMany(buffer => Observable
.FromAsync(() => Flush(buffer.Select(x => (x.Key, x.Value)))))
)
.ToTask();
}
Пояснение
  • Код: Выделить всё

    GroupByUntil
    с помощью одного ключа для создания последовательности, которая может быть завершена по требованию путем запуска наблюдаемого «сброса». Это похоже на TakeUntil, но может создавать несколько наблюдаемых вместо одного
  • Код: Выделить всё

    Aggergate
    для выполнения накопления. После завершения группы, вызванного «сбросом», будет создан словарь с накоплениями.
  • Код: Выделить всё

    Observable.FromAsync
    создает отложенное действие, которое начнется, когда появится наблюдатель, поэтому он получит буфер с накоплениями и начнет обработку. FromAsync возвращает IObservable и выдает значение после завершения задачи
  • Код: Выделить всё

    SelectMany
    используется для объединения последовательности наблюдаемых величин в одну наблюдаемую последовательность. Он заменяет Select + Merge
    Finally ToTask возвращает задачу, которая завершится, когда последний раз будет получен модуль, указывающий, что все задачи обработки завершены


Подробнее здесь: https://stackoverflow.com/questions/783 ... losing-seq
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»