У меня возникли проблемы с правильной реализацией следующего:
У меня есть два экземпляра IAsynEnumerable (одного типа), один из которых является «основным» источником, цель другого — предоставить пользователю может «вставлять» элементы по требованию.
Поэтому, если основной источник заканчивается, вторичный также должен остановиться, завершив перечисление.
Вызывающая функция Merge также должна иметь возможность реагировать, когда основной источник заканчивается (т. е. очищать вторичное перечисляемое).
Учитывая это, я придумал следующую реализацию:
static async IAsyncEnumerable Merge(IAsyncEnumerable primary, IAsyncEnumerable secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);
Task? pItTask = null;
Task? sItTask = null;
IAsyncEnumerator? it = null;
while (true) {
Task task;
try {
pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();
task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
yield break;
}
if (task.Result) yield return it.Current;
}
}
Проблема в том, что это вызовет исключение NotSupportedException, если основной источник выдает исключение, что портит мою обработку исключений при подъеме по стеку.
Я обнаружил проблему на GitHub, в которой упоминается проблема, но на самом деле она не решается (исключение появляется даже с задержкой).
Полный код для воспроизведения проблема:
https://dotnetfiddle.net/v7SgEV
//Needs the "System.Interactive.Async" nuget package
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
while (true) {
try {
using var cts = new CancellationTokenSource();
var secondaryDataSource = Channel.CreateUnbounded();
var reader = PrimaryDataSource(cts.Token).Do(x => {
Console.WriteLine("---");
});
var combinedSource = Merge(
reader,
secondaryDataSource.Reader.ReadAllAsync(cts.Token),
() => secondaryDataSource.Writer.TryComplete(),
cts.Token
);
await foreach (var item in combinedSource) {
Console.WriteLine(item ?? "");
throw new Exception();
}
} catch (NotSupportedException ex) {
throw;
} catch (Exception) {
}
}
static async IAsyncEnumerable PrimaryDataSource([EnumeratorCancellation] CancellationToken ct) {
while (true) {
await Task.Delay(Random.Shared.Next(100), ct).ConfigureAwait(false);
yield return "PrimaryDataSource";
throw new Exception();
yield break;
}
}
static async IAsyncEnumerable Merge(IAsyncEnumerable primary, IAsyncEnumerable secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);
Task? pItTask = null;
Task? sItTask = null;
IAsyncEnumerator? it = null;
while (true) {
Task task;
try {
pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();
task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
if (pItTask != null) await pItTask;
if (sItTask != null) await sItTask;
yield break;
}
if (task.Result) yield return it.Current;
}
}
Подробнее здесь: https://stackoverflow.com/questions/790 ... upportedex
Объединение двух экземпляров IAsyncEnumerable с поддержкой отмены (NotSupportedException) ⇐ C#
Место общения программистов C#
1729147825
Anonymous
У меня возникли проблемы с правильной реализацией следующего:
У меня есть два экземпляра IAsynEnumerable (одного типа), один из которых является «основным» источником, цель другого — предоставить пользователю может «вставлять» элементы по требованию.
Поэтому, если основной источник заканчивается, вторичный также должен остановиться, завершив перечисление.
Вызывающая функция Merge также должна иметь возможность реагировать, когда основной источник заканчивается (т. е. очищать вторичное перечисляемое).
Учитывая это, я придумал следующую реализацию:
static async IAsyncEnumerable Merge(IAsyncEnumerable primary, IAsyncEnumerable secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);
Task? pItTask = null;
Task? sItTask = null;
IAsyncEnumerator? it = null;
while (true) {
Task task;
try {
pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();
task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
yield break;
}
if (task.Result) yield return it.Current;
}
}
Проблема в том, что это вызовет исключение NotSupportedException, если основной источник выдает исключение, что портит мою обработку исключений при подъеме по стеку.
Я обнаружил проблему на GitHub, в которой упоминается проблема, но на самом деле она не решается (исключение появляется даже с задержкой).
Полный код для воспроизведения проблема:
https://dotnetfiddle.net/v7SgEV
//Needs the "System.Interactive.Async" nuget package
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
while (true) {
try {
using var cts = new CancellationTokenSource();
var secondaryDataSource = Channel.CreateUnbounded();
var reader = PrimaryDataSource(cts.Token).Do(x => {
Console.WriteLine("---");
});
var combinedSource = Merge(
reader,
secondaryDataSource.Reader.ReadAllAsync(cts.Token),
() => secondaryDataSource.Writer.TryComplete(),
cts.Token
);
await foreach (var item in combinedSource) {
Console.WriteLine(item ?? "");
throw new Exception();
}
} catch (NotSupportedException ex) {
throw;
} catch (Exception) {
}
}
static async IAsyncEnumerable PrimaryDataSource([EnumeratorCancellation] CancellationToken ct) {
while (true) {
await Task.Delay(Random.Shared.Next(100), ct).ConfigureAwait(false);
yield return "PrimaryDataSource";
throw new Exception();
yield break;
}
}
static async IAsyncEnumerable Merge(IAsyncEnumerable primary, IAsyncEnumerable secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);
Task? pItTask = null;
Task? sItTask = null;
IAsyncEnumerator? it = null;
while (true) {
Task task;
try {
pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();
task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
if (pItTask != null) await pItTask;
if (sItTask != null) await sItTask;
yield break;
}
if (task.Result) yield return it.Current;
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79096778/merging-two-iasyncenumerable-instances-with-cancellation-support-notsupportedex[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия