Объединение двух экземпляров IAsyncEnumerable с поддержкой отмены (NotSupportedException)C#

Место общения программистов C#
Ответить
Anonymous
 Объединение двух экземпляров IAsyncEnumerable с поддержкой отмены (NotSupportedException)

Сообщение Anonymous »

У меня возникли проблемы с правильной реализацией следующего:

У меня есть два экземпляра IAsynEnumerable (одного типа), один из которых является «основным» источником, цель другого — предоставить пользователю может «вставлять» элементы по требованию.

Поэтому, если основной источник заканчивается, вторичный также должен остановиться, завершив перечисление.
Вызывающая функция Merge также должна иметь возможность реагировать, когда основной источник заканчивается (т. е. очищать вторичное перечисляемое).
Учитывая это, я придумал следующую реализацию:
static async IAsyncEnumerable Merge(IAsyncEnumerable primary, IAsyncEnumerable secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);

await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);

Task? pItTask = null;
Task? sItTask = null;

IAsyncEnumerator? it = null;
while (true) {
Task task;

try {

pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();

task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}

} else {

sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
yield break;
}

if (task.Result) yield return it.Current;
}
}

Проблема в том, что это вызовет исключение NotSupportedException, если основной источник выдает исключение, что портит мою обработку исключений при подъеме по стеку.
Я обнаружил проблему на GitHub, в которой упоминается проблема, но на самом деле она не решается (исключение появляется даже с задержкой).
Полный код для воспроизведения проблема:

https://dotnetfiddle.net/v7SgEV
//Needs the "System.Interactive.Async" nuget package
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Threading.Channels;

while (true) {
try {
using var cts = new CancellationTokenSource();

var secondaryDataSource = Channel.CreateUnbounded();

var reader = PrimaryDataSource(cts.Token).Do(x => {
Console.WriteLine("---");
});

var combinedSource = Merge(
reader,
secondaryDataSource.Reader.ReadAllAsync(cts.Token),
() => secondaryDataSource.Writer.TryComplete(),
cts.Token
);

await foreach (var item in combinedSource) {
Console.WriteLine(item ?? "");
throw new Exception();
}

} catch (NotSupportedException ex) {
throw;
} catch (Exception) {
}
}

static async IAsyncEnumerable PrimaryDataSource([EnumeratorCancellation] CancellationToken ct) {
while (true) {
await Task.Delay(Random.Shared.Next(100), ct).ConfigureAwait(false);
yield return "PrimaryDataSource";

throw new Exception();
yield break;
}
}

static async IAsyncEnumerable Merge(IAsyncEnumerable primary, IAsyncEnumerable secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);

await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);

Task? pItTask = null;
Task? sItTask = null;

IAsyncEnumerator? it = null;
while (true) {
Task task;

try {

pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();

task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}

} else {

sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
if (pItTask != null) await pItTask;
if (sItTask != null) await sItTask;
yield break;
}

if (task.Result) yield return it.Current;
}
}


Подробнее здесь: https://stackoverflow.com/questions/790 ... upportedex
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «C#»