У меня есть асинхронная последовательность (поток) сообщений, которые приходят иногда в большом количестве, а иногда и спорадически, и я хотел бы обрабатывать их пакетами по 10 сообщений в пакете. Я также хочу установить верхний предел задержки между получением сообщения и его обработкой, поэтому пакет, содержащий менее 10 сообщений, также должен обрабатываться, если после получения первого сообщения пакета прошло 5 секунд. Я обнаружил, что могу решить первую часть проблемы, используя оператор Buffer из пакета System.Interactive.Async:
Код: Выделить всё
IAsyncEnumerable source = GetStreamOfMessages();
IAsyncEnumerable batches = source.Buffer(10);
await foreach (IList batch in batches)
{
// Process batch
}
Подпись оператора Buffer:
Код: Выделить всё
public static IAsyncEnumerable Buffer(
this IAsyncEnumerable source, int count);
К сожалению, оператор Buffer не перегружен параметром TimeSpan, поэтому вторую часть проблемы я не могу так легко решить. Придется мне самому как-то реализовать пакетный оператор с таймером. Мой вопрос: как я могу реализовать вариант оператора Buffer, который имеет подпись ниже?
Код: Выделить всё
public static IAsyncEnumerable Buffer(
this IAsyncEnumerable source, TimeSpan timeSpan, int count);
Параметр timeSpan должен влиять на поведение оператора Buffer следующим образом:
- Пакет должен быть отправлен по истечении времени timeSpan после отправки предыдущего пакета (или первоначально после вызова метода Buffer).
- Пустой пакет должен быть создан, если после отправки истек timeSpan предыдущий пакет, и за это время не было получено ни одного сообщения.
- Отправка пакетов чаще, чем каждый раз, означает, что пакеты заполнены. Выдавать пакет с меньшим количеством сообщений до истечения timeSpan нежелательно.
При необходимости я могу добавить внешние зависимости в свой проект, например пакеты System.Interactive.Async или System.Linq.Async.
P.S. этот вопрос был вызван недавним вопросом, связанным с каналами и утечками памяти.