Я использую канал в сценарии производителя-потребителя, и у меня есть требование поглотить канал в партиях по 10 пунктов каждый и не позволяя ни одного потребляемого предмета оставаться в буфере в течение более 5 секунд. Эта продолжительность является максимальной задержкой между чтением элемента с канала и обработкой партии, содержащей этот элемент. Политика максимальной задержки имеет приоритет по сравнению с политикой пакетного размера, поэтому партия должна обрабатываться даже с менее чем 10 элементами, чтобы удовлетворить требование максимальной задержки.
Код: Выделить всё
public static async IAsyncEnumerable ReadAllBatches(
this ChannelReader channelReader, int batchSize)
{
List buffer = new();
while (true)
{
T item;
try { item = await channelReader.ReadAsync(); }
catch (ChannelClosedException) { break; }
buffer.Add(item);
if (buffer.Count == batchSize)
{
yield return buffer.ToArray();
buffer.Clear();
}
}
if (buffer.Count > 0) yield return buffer.ToArray();
await channelReader.Completion; // Propagate possible failure
}
< /code>
Я планирую использовать его как: < /p>
await foreach (Item[] batch in myChannel.Reader.ReadAllBatches(10))
{
Console.WriteLine($"Processing batch of {batch.Length} items");
}
Мой вопрос: Как я могу улучшить свой параметр readallbatches с помощью дополнительного тайм-аута временного пространства , который обеспечивает соблюдение вышеупомянутой политики максимальной латентности и без установки сторонних пакетов для моего проекта? к проблеме утечки памяти, о которой здесь сообщалось. Таким образом, цикл, который потребляет канал, не должен вызывать устойчивый прирост памяти, используемой приложением, в случае, если продюсер, который записывает элементы в канале, стал холостом в течение длительного периода времени.
Примечание: Я знаю, что аналогичный вопрос существует относительно пакетирования iasyncenumerable
Подробнее здесь:
https://stackoverflow.com/questions/723 ... etween-con