< Br /> Создайте разместителя, который берет элементы из исходного перечисленного по одному и не использует промежуточное хранилище, к которому можно получить доступ более эффективно по нескольким потокам. Этот вариант обеспечивает поддержку низкой задержки (элементы будут обрабатываться, как только они будут доступны из источника) и обеспечивает частичную поддержку зависимостей между элементами (поток не может ждет тупика, ожидая, что сам поток отвечает за обработку). /p>
< /blockquote>
Такого варианта или перегрузки не существует для асинхронной параллельной. Foreachasync < /code>. И это проблема для меня, потому что я хочу использовать этот метод с помощью канала в качестве источника, в сценарии производителя-потребителя в качестве потребителя. В моем сценарии важно, чтобы потребитель кусал именно то, что он может жевать, и не более. Я не хочу, чтобы потребитель активно тянул канал , а затем помещал вытянутые элементы в свой личный скрытый буфер. Я хочу, чтобы канал был единственной очередью в системе, чтобы я мог контролировать его и иметь точную статистику о элементах, которые ожидают обработки/потребления.
До недавнего времени у меня было впечатление, что метод параллели Но чтобы быть уверенным, я спросил Microsoft на GitHub для разъяснения. Я получил обратную связь очень быстро, но не то, что я ожидал: < /p>
Это деталь реализации. С помощью parallel.foreach < / code> буферизация выполняется для обработки делегатов тела, которые могут быть действительно быстрыми, и, таким образом, она пытается минимизировать / амортизировать стоимость заблокировки для доступа к общему перечислителю. С Foreachasync ожидается, что делегаты тела будут хотя бы немного более мясо, и поэтому он не пытается сделать такую амортизацию. По крайней мере, сегодня. Поэтому я должен переосмыслить свой подход. поведение? Если да, как? Я прошу какую -то тонкую обертку вокруг существующей параллели Что -то вроде этого: < /p>
Код: Выделить всё
public static Task ForEachAsync_NoBuffering(
IAsyncEnumerable source,
ParallelOptions parallelOptions,
Func body)
{
// Some magic here
return Parallel.ForEachAsync(source, parallelOptions, body);
}
< /code>
Обертка должна вести себя точно так же с методом Parallel.foreachasync < /code> на .net 6. < /p>
[b] Обновление: [/b] Вот основная планировка моего сценария:
class Processor
{
private readonly Channel _channel;
private readonly Task _consumer;
public Processor()
{
_channel = Channel.CreateUnbounded();
_consumer = StartConsumer();
}
public int PendingItemsCount => _channel.Reader.Count;
public Task Completion => _consumer;
public void QueueItem(Item item) => _channel.Writer.TryWrite(item);
private async Task StartConsumer()
{
ParallelOptions options = new() { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(_channel.Reader.ReadAllAsync(), options, async (item, _) =>
{
// Call async API
// Persist the response of the API in an RDBMS
});
}
}
Подробнее здесь: https://stackoverflow.com/questions/731 ... obuffering
Мобильная версия