Как запустить параллель. Foreachasync Loop с Nobuffering?C#

Место общения программистов C#
Ответить
Anonymous
 Как запустить параллель. Foreachasync Loop с Nobuffering?

Сообщение Anonymous »

Метод синхронной параллели. foreach имеет много перегрузки, и некоторые из них позволяют настроить параллельный цикл с помощью перечисленного partitioneroptions.nobuffering опция:
< Br /> Создайте разместителя, который берет элементы из исходного перечисленного по одному и не использует промежуточное хранилище, к которому можно получить доступ более эффективно по нескольким потокам. Этот вариант обеспечивает поддержку низкой задержки (элементы будут обрабатываться, как только они будут доступны из источника) и обеспечивает частичную поддержку зависимостей между элементами (поток не может ждет тупика, ожидая, что сам поток отвечает за обработку). /p>
< /blockquote>
Такого варианта или перегрузки не существует для асинхронной параллельной. Foreachasync < /code>. И это проблема для меня, потому что я хочу использовать этот метод с помощью канала в качестве источника, в сценарии производителя-потребителя в качестве потребителя. В моем сценарии важно, чтобы потребитель кусал именно то, что он может жевать, и не более. Я не хочу, чтобы потребитель активно тянул канал , а затем помещал вытянутые элементы в свой личный скрытый буфер. Я хочу, чтобы канал был единственной очередью в системе, чтобы я мог контролировать его и иметь точную статистику о элементах, которые ожидают обработки/потребления.
До недавнего времени у меня было впечатление, что метод параллели Но чтобы быть уверенным, я спросил Microsoft на GitHub для разъяснения. Я получил обратную связь очень быстро, но не то, что я ожидал: < /p>

Это деталь реализации. С помощью parallel.foreach < / code> буферизация выполняется для обработки делегатов тела, которые могут быть действительно быстрыми, и, таким образом, она пытается минимизировать / амортизировать стоимость заблокировки для доступа к общему перечислителю. С Foreachasync ожидается, что делегаты тела будут хотя бы немного более мясо, и поэтому он не пытается сделать такую ​​амортизацию. По крайней мере, сегодня. Поэтому я должен переосмыслить свой подход. поведение? Если да, как? Я прошу какую -то тонкую обертку вокруг существующей параллели Что -то вроде этого: < /p>

Код: Выделить всё

public static Task ForEachAsync_NoBuffering(
IAsyncEnumerable source,
ParallelOptions parallelOptions,
Func body)
{
// Some magic here
return Parallel.ForEachAsync(source, parallelOptions, body);
}
< /code>
Обертка должна вести себя точно так же с методом Parallel.foreachasync < /code> на .net 6. < /p>

[b] Обновление: [/b] Вот основная планировка моего сценария: 
class Processor
{
private readonly Channel _channel;
private readonly Task _consumer;

public Processor()
{
_channel = Channel.CreateUnbounded();
_consumer = StartConsumer();
}

public int PendingItemsCount => _channel.Reader.Count;
public Task Completion => _consumer;

public void QueueItem(Item item) => _channel.Writer.TryWrite(item);

private async Task StartConsumer()
{
ParallelOptions options = new() { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(_channel.Reader.ReadAllAsync(), options, async (item, _) =>
{
// Call async API
// Persist the response of the API in an RDBMS
});
}
}
Могут быть и другие доступные инструменты, которые также могут использоваться для этой цели, но я предпочитаю использовать Horming Hot (.net 6) parallel.foreachasync api. Это в центре внимания этого вопроса.

Подробнее здесь: https://stackoverflow.com/questions/731 ... obuffering
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»