У меня возникли проблемы с правильной реализацией следующего:
У меня есть два экземпляра IAsyncEnumerable (одного типа), один из которых является «основным» источником, цель другого заключается в том, что пользователь может «вставлять» элементы по требованию.
Поэтому, если основной источник заканчивается, вторичный также должен остановиться, заканчивая перечисление.
Вызывающей функции Merge также необходимо чтобы иметь возможность реагировать, когда основной источник заканчивается (т.е. очищать вторичное перечисляемое).
Учитывая это, я придумал следующую реализацию:
static async IAsyncEnumerable Merge(IAsyncEnumerable primary, IAsyncEnumerable secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);
Task? pItTask = null;
Task? sItTask = null;
IAsyncEnumerator? it = null;
while (true) {
Task task;
try {
pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();
task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
yield break;
}
if (task.Result) yield return it.Current;
}
}
Проблема в том, что это вызовет исключение NotSupportedException, если основной источник выдаст исключение, что испортит мою обработку исключений при подъеме по стеку.
Я нашел проблему на GitHub, в которой упоминается проблема, но на самом деле она не решается (исключение появляется даже с задержкой).
Полный код для воспроизведения проблема:
https://dotnetfiddle.net/v7SgEV
//Needs the "System.Interactive.Async" nuget package
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
while (true) {
try {
using var cts = new CancellationTokenSource();
var secondaryDataSource = Channel.CreateUnbounded();
var reader = PrimaryDataSource(cts.Token).Do(x => {
Console.WriteLine("---");
});
var combinedSource = Merge(
reader,
secondaryDataSource.Reader.ReadAllAsync(cts.Token),
() => secondaryDataSource.Writer.TryComplete(),
cts.Token
);
await foreach (var item in combinedSource) {
Console.WriteLine(item ?? "");
throw new Exception();
}
} catch (NotSupportedException ex) {
throw;
} catch (Exception) {
}
}
static async IAsyncEnumerable PrimaryDataSource([EnumeratorCancellation] CancellationToken ct) {
while (true) {
await Task.Delay(Random.Shared.Next(100), ct).ConfigureAwait(false);
yield return "PrimaryDataSource";
throw new Exception();
yield break;
}
}
static async IAsyncEnumerable Merge(IAsyncEnumerable primary, IAsyncEnumerable secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);
Task? pItTask = null;
Task? sItTask = null;
IAsyncEnumerator? it = null;
while (true) {
Task task;
try {
pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();
task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
if (pItTask != null) await pItTask;
if (sItTask != null) await sItTask;
yield break;
}
if (task.Result) yield return it.Current;
}
}
Подробнее здесь: https://stackoverflow.com/questions/790 ... upportedex
Объединение двух экземпляров IAsyncEnumerable с поддержкой отмены (NotSupportedException) ⇐ C#
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Объединение двух экземпляров IAsyncEnumerable с поддержкой отмены (NotSupportedException)
Anonymous » » в форуме C# - 0 Ответы
- 18 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Объединение двух экземпляров IAsyncEnumerable с поддержкой отмены (NotSupportedException)
Anonymous » » в форуме C# - 0 Ответы
- 17 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Итерация IAsyncenumerable в функции, возвращающей IAsyncenumerable с отменой
Anonymous » » в форуме C# - 0 Ответы
- 24 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Удаление IAsyncEnumerator не отменяет базовый токен отмены IAsyncEnumerable.
Anonymous » » в форуме C# - 0 Ответы
- 26 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Удаление IAsyncEnumerator не отменяет базовый токен отмены IAsyncEnumerable.
Anonymous » » в форуме C# - 0 Ответы
- 26 Просмотры
-
Последнее сообщение Anonymous
-