Как пакетировать IAsyncEnumerable, применяя политику максимального интервала между последовательными пакетами?C#

Место общения программистов C#
Anonymous
Как пакетировать IAsyncEnumerable, применяя политику максимального интервала между последовательными пакетами?

Сообщение Anonymous »

У меня есть асинхронная последовательность (поток) сообщений, которые приходят иногда в большом количестве, а иногда и спорадически, и я хотел бы обрабатывать их пакетами по 10 сообщений в пакете. Я также хочу установить верхний предел задержки между получением сообщения и его обработкой, поэтому пакет, содержащий менее 10 сообщений, также должен обрабатываться, если после получения первого сообщения пакета прошло 5 секунд. Я обнаружил, что могу решить первую часть проблемы, используя оператор Buffer из пакета System.Interactive.Async:

Код: Выделить всё

IAsyncEnumerable source = GetStreamOfMessages();
IAsyncEnumerable batches = source.Buffer(10);
await foreach (IList batch in batches)
{
// Process batch
}
Подпись оператора Buffer:

Код: Выделить всё

public static IAsyncEnumerable Buffer(
this IAsyncEnumerable source, int count);
К сожалению, оператор Buffer не перегружен параметром TimeSpan, поэтому вторую часть проблемы я не могу так легко решить. Придется мне самому как-то реализовать пакетный оператор с таймером. Мой вопрос: как я могу реализовать вариант оператора Buffer, который имеет подпись ниже?

Код: Выделить всё

public static IAsyncEnumerable Buffer(
this IAsyncEnumerable source, TimeSpan timeSpan, int count);
Параметр timeSpan должен влиять на поведение оператора Buffer следующим образом:
  • Пакет должен быть отправлен по истечении времени timeSpan после отправки предыдущего пакета (или первоначально после вызова метода Buffer).
  • Пустой пакет должен быть создан, если после отправки истек timeSpan предыдущий пакет, и за это время не было получено ни одного сообщения.
  • Отправка пакетов чаще, чем каждый раз, означает, что пакеты заполнены. Выдавать пакет с меньшим количеством сообщений до истечения timeSpan нежелательно.
При необходимости я могу добавить внешние зависимости в свой проект, например пакеты System.Interactive.Async или System.Linq.Async.
P.S. этот вопрос был вызван недавним вопросом, связанным с каналами и утечками памяти.

Вернуться в «C#»