Объединение двух экземпляров IAsyncEnumerable с поддержкой отмены (NotSupportedException)C#

Место общения программистов C#
Ответить Пред. темаСлед. тема
Anonymous
 Объединение двух экземпляров IAsyncEnumerable с поддержкой отмены (NotSupportedException)

Сообщение Anonymous »

У меня возникли проблемы с правильной реализацией следующего:

У меня есть два экземпляра IAsyncEnumerable (одного типа), один из которых является «основным» источником, цель другого заключается в том, что пользователь может «вставлять» элементы по требованию.

Поэтому, если основной источник заканчивается, вторичный также должен остановиться, заканчивая перечисление.
Вызывающей функции Merge также необходимо чтобы иметь возможность реагировать, когда основной источник заканчивается (т.е. очищать вторичное перечисляемое).
Учитывая это, я придумал следующую реализацию:
static async IAsyncEnumerable Merge(IAsyncEnumerable primary, IAsyncEnumerable secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);

await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);

Task? pItTask = null;
Task? sItTask = null;

IAsyncEnumerator? it = null;
while (true) {
Task task;

try {

pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();

task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}

} else {

sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
yield break;
}

if (task.Result) yield return it.Current;
}
}

Проблема в том, что это вызовет исключение NotSupportedException, если основной источник выдает исключение, что портит мою обработку исключений при подъеме по стеку.
Я обнаружил проблему на GitHub, в которой упоминается проблема, но на самом деле она не решается (исключение появляется даже с задержкой).
Полный код для воспроизведения проблема:

https://dotnetfiddle.net/v7SgEV
//Needs the "System.Interactive.Async" nuget package
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Threading.Channels;

while (true) {
try {
using var cts = new CancellationTokenSource();

var secondaryDataSource = Channel.CreateUnbounded();

var reader = PrimaryDataSource(cts.Token).Do(x => {
Console.WriteLine("---");
});

var combinedSource = Merge(
reader,
secondaryDataSource.Reader.ReadAllAsync(cts.Token),
() => secondaryDataSource.Writer.TryComplete(),
cts.Token
);

await foreach (var item in combinedSource) {
Console.WriteLine(item ?? "");
throw new Exception();
}

} catch (NotSupportedException ex) {
throw;
} catch (Exception) {
}
}

static async IAsyncEnumerable PrimaryDataSource([EnumeratorCancellation] CancellationToken ct) {
while (true) {
await Task.Delay(Random.Shared.Next(100), ct).ConfigureAwait(false);
yield return "PrimaryDataSource";

throw new Exception();
yield break;
}
}

static async IAsyncEnumerable Merge(IAsyncEnumerable primary, IAsyncEnumerable secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);

await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);

Task? pItTask = null;
Task? sItTask = null;

IAsyncEnumerator? it = null;
while (true) {
Task task;

try {

pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();

task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}

} else {

sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
if (pItTask != null) await pItTask;
if (sItTask != null) await sItTask;
yield break;
}

if (task.Result) yield return it.Current;
}
}


Подробнее здесь: https://stackoverflow.com/questions/790 ... upportedex
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Объединение двух экземпляров IAsyncEnumerable с поддержкой отмены (NotSupportedException)
    Anonymous » » в форуме C#
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous
  • Объединение двух экземпляров IAsyncEnumerable с поддержкой отмены (NotSupportedException)
    Anonymous » » в форуме C#
    0 Ответы
    38 Просмотры
    Последнее сообщение Anonymous
  • Итерация IAsyncenumerable в функции, возвращающей IAsyncenumerable с отменой
    Anonymous » » в форуме C#
    0 Ответы
    25 Просмотры
    Последнее сообщение Anonymous
  • Удаление IAsyncEnumerator не отменяет базовый токен отмены IAsyncEnumerable.
    Anonymous » » в форуме C#
    0 Ответы
    26 Просмотры
    Последнее сообщение Anonymous
  • Удаление IAsyncEnumerator не отменяет базовый токен отмены IAsyncEnumerable.
    Anonymous » » в форуме C#
    0 Ответы
    26 Просмотры
    Последнее сообщение Anonymous

Вернуться в «C#»